from netra import workflow, task, SpanWrapper
@workflow()
def build_rag_pipeline(documents: list[Document]):
index_span = SpanWrapper("build-index", {
"documents.count": len(documents)
}).start()
index = VectorStoreIndex.from_documents(documents)
index_span.end()
return index
@task()
def query_with_retrieval(query_engine, question: str):
query_span = SpanWrapper("query-execution", {
"query.text": question
}).start()
response = query_engine.query(question)
query_span.set_attribute("response.sources", len(response.source_nodes or []))
query_span.end()
return response