from netra import workflow, agent, SpanWrapper
import dspy
class RAG(dspy.Module):
def __init__(self, num_passages=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate = dspy.ChainOfThought("context, question -> answer")
@task()
def forward(self, question):
context = self.retrieve(question).passages
answer = self.generate(context=context, question=question)
return answer
@workflow()
def rag_pipeline(question: str):
span = SpanWrapper("rag-pipeline", {
"question": question
}).start()
rag = RAG()
result = rag.forward(question=question)
span.set_attribute("answer", result.answer)
span.end()
return result