Skip to main content

Installation

Install both the Netra SDK and Haystack:
pip install netra-sdk haystack-ai

Usage

Initialize the Netra SDK to automatically trace all Haystack operations:
from netra import Netra
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
import os

# Initialize Netra
Netra.init(
    headers=f"x-api-key={os.environ.get('NETRA_API_KEY')}",
    trace_content=True
)

# Create pipeline - automatically traced
pipeline = Pipeline()
pipeline.add_component("generator", OpenAIGenerator())

result = pipeline.run({
    "generator": {
        "prompt": "What is Haystack?"
    }
})

Pipelines

Trace Haystack pipelines with custom decorators:
from netra import workflow, task, SpanWrapper
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder

@workflow()
def build_qa_pipeline():
    pipeline_span = SpanWrapper("build-pipeline").start()
    
    pipeline = Pipeline()
    
    # Add components
    pipeline.add_component("prompt_builder", PromptBuilder(
        template="Answer this question: {{question}}"
    ))
    pipeline.add_component("generator", OpenAIGenerator())
    
    # Connect components
    pipeline.connect("prompt_builder", "generator")
    
    pipeline_span.end()
    return pipeline

@task()
def run_pipeline(pipeline: Pipeline, question: str):
    run_span = SpanWrapper("run-pipeline", {
        "question": question
    }).start()
    
    result = pipeline.run({
        "prompt_builder": {"question": question}
    })
    
    run_span.set_attribute("answer", result["generator"]["replies"][0])
    run_span.end()
    
    return result

Retrievers

Trace document retrieval:
from netra import task, SpanWrapper
from haystack import Pipeline
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.dataclasses import Document

@task()
def setup_retrieval_pipeline():
    setup_span = SpanWrapper("setup-retrieval").start()
    
    # Create document store
    document_store = InMemoryDocumentStore()
    document_store.write_documents([
        Document(content="Haystack is an NLP framework."),
        Document(content="It supports RAG pipelines.")
    ])
    
    # Create pipeline
    pipeline = Pipeline()
    pipeline.add_component("retriever", InMemoryBM25Retriever(
        document_store=document_store
    ))
    
    setup_span.end()
    return pipeline

@task()
def retrieve_documents(pipeline: Pipeline, query: str):
    retrieval_span = SpanWrapper("retrieve-docs", {
        "query": query
    }).start()
    
    result = pipeline.run({
        "retriever": {"query": query}
    })
    
    docs = result["retriever"]["documents"]
    retrieval_span.set_attribute("documents.count", len(docs))
    retrieval_span.end()
    
    return docs

RAG Pipeline

Trace complete RAG implementations:
from netra import workflow, SpanWrapper
from haystack import Pipeline
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder

@workflow()
def rag_pipeline(document_store, query: str):
    rag_span = SpanWrapper("rag-pipeline", {
        "query": query
    }).start()
    
    # Build pipeline
    pipeline = Pipeline()
    
    pipeline.add_component("retriever", InMemoryBM25Retriever(
        document_store=document_store
    ))
    pipeline.add_component("prompt_builder", PromptBuilder(
        template="""Context: {{documents}}
        Question: {{query}}
        Answer:"""
    ))
    pipeline.add_component("generator", OpenAIGenerator())
    
    # Connect components
    pipeline.connect("retriever", "prompt_builder.documents")
    pipeline.connect("prompt_builder", "generator")
    
    # Run pipeline
    result = pipeline.run({
        "retriever": {"query": query},
        "prompt_builder": {"query": query}
    })
    
    rag_span.set_attribute("answer", result["generator"]["replies"][0])
    rag_span.end()
    
    return result

Custom Components

Trace custom pipeline components:
from netra import task, SpanWrapper
from haystack import component
from typing import List

@component
class CustomProcessor:
    @component.output_types(processed=str)
    @task()
    def run(self, text: str) -> dict:
        process_span = SpanWrapper("custom-process", {
            "text.length": len(text)
        }).start()
        
        # Custom processing
        processed = text.upper()
        
        process_span.set_attribute("processed.length", len(processed))
        process_span.end()
        
        return {"processed": processed}

Next Steps

Last modified on January 30, 2026