Skip to main content

Overview

UEL provides powerful patterns for building Retrieval-Augmented Generation (RAG) systems. These patterns combine retrieval with generation in elegant, composable chains.

Basic RAG Pattern

from upsonic.uel import ChatPromptTemplate, RunnablePassthrough, StrOutputParser, itemgetter
from upsonic.models import infer_model

model = infer_model("anthropic/claude-sonnet-4-5")
parser = StrOutputParser()

# Simulate a retriever (replace with actual retriever)
def document_retriever(question):
    return f"Relevant context: {question} is about artificial intelligence and machine learning."

# Basic RAG chain
rag_chain = (
    {
        "context": itemgetter("question") | (lambda x: document_retriever(x)),
        "question": itemgetter("question")
    }
    | ChatPromptTemplate.from_template(
        "Answer the question based on this context:\n\nContext: {context}\n\nQuestion: {question}"
    )
    | model
    | parser
)

result = rag_chain.invoke({"question": "What is machine learning?"})
print(result)

RAG with Conversation History

from upsonic.uel import ChatPromptTemplate, RunnablePassthrough, StrOutputParser, itemgetter
from upsonic.models import infer_model

model = infer_model("anthropic/claude-sonnet-4-5").add_memory(history=True, mode="auto")
parser = StrOutputParser()

contextualize_template = ChatPromptTemplate.from_messages([
    ("system", """You are a question rephraser.
Your ONLY job is to rephrase the user's question to be standalone, given the conversation history.
DO NOT answer the question. DO NOT provide any information.
ONLY output the rephrased question, nothing else.

Example:
History: "What is Python?" → "Python is a programming language."
User: "What about its speed?"
Output: "How fast is Python?" or "What is Python's performance?"
"""),
    ("placeholder", {"variable_name": "chat_history"}),
    ("human", "Rephrase this question to be standalone: {question}")
])

contextualize_chain = (
    contextualize_template
    | model
    | parser
)

rag_chain = (
    RunnablePassthrough.assign(
        contextualized_question=lambda x: (
            contextualize_chain.invoke(x).strip() if x.get("chat_history") 
            else x["question"]
        )
    ).assign(
        context=lambda x: f"Context for: {x.get('contextualized_question', x['question'])}"
    )
    | ChatPromptTemplate.from_messages([
        ("system", "Answer using this context: {context}"),
        ("placeholder", {"variable_name": "chat_history"}),
        ("human", "{contextualized_question}")
    ])
    | model
    | parser
)

result = rag_chain.invoke({
    "question": "What about Python?",
    "chat_history": [
        ("human", "Tell me about programming languages"),
        ("ai", "There are many programming languages...")
    ]
})
print(result)

Multi-Source RAG

from upsonic.uel import ChatPromptTemplate, StrOutputParser, itemgetter
from upsonic.models import infer_model

model = infer_model("anthropic/claude-sonnet-4-5")
parser = StrOutputParser()

# Simulate multiple retrievers
def document_retriever(question):
    return f"Document context: {question} relates to documentation."

def web_retriever(question):
    return f"Web context: {question} has web resources available."

def example_retriever(question):
    return f"Example context: Here are examples related to {question}."

# Retrieve from multiple sources in parallel
multi_source_rag = (
    {
        "docs": itemgetter("question") | (lambda x: document_retriever(x)),
        "web": itemgetter("question") | (lambda x: web_retriever(x)),
        "examples": itemgetter("question") | (lambda x: example_retriever(x)),
        "question": itemgetter("question")
    }
    | ChatPromptTemplate.from_template(
        "Answer the question using information from multiple sources:\n\n"
        "Documents: {docs}\n"
        "Web: {web}\n"
        "Examples: {examples}\n\n"
        "Question: {question}"
    )
    | model
    | parser
)

result = multi_source_rag.invoke({"question": "How do I use decorators in Python?"})
print(result)

RAG with Re-ranking

from upsonic.uel import chain, ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model

@chain
def rag_with_reranking(input_dict):
    """RAG with re-ranking step"""
    model = infer_model("anthropic/claude-sonnet-4-5")
    parser = StrOutputParser()
    question = input_dict["question"]
    
    # Step 1: Retrieve multiple documents
    documents = [
        f"Document 1 about {question}",
        f"Document 2 about {question}",
        f"Document 3 about {question}"
    ]
    
    # Step 2: Re-rank documents
    rerank_prompt = ChatPromptTemplate.from_template(
        "Rank these documents by relevance to the question:\n\n"
        "Question: {question}\n\n"
        "Documents:\n{docs}\n\n"
        "Return the most relevant document."
    )
    best_doc = (rerank_prompt | model | parser).invoke({
        "question": question,
        "docs": "\n".join(f"{i+1}. {doc}" for i, doc in enumerate(documents))
    })
    
    # Step 3: Generate answer with best document
    answer_prompt = ChatPromptTemplate.from_template(
        "Answer the question using this document:\n\n"
        "Document: {document}\n\n"
        "Question: {question}"
    )
    answer = (answer_prompt | model | parser).invoke({
        "document": best_doc,
        "question": question
    })
    
    return {
        "answer": answer,
        "source": best_doc
    }

result = rag_with_reranking.invoke({"question": "What is Python?"})
print(f"Answer: {result['answer']}")
print(f"Source: {result['source']}")

Streaming RAG

# Note: Streaming is not directly supported in UEL chains
# For streaming, use the Agent class instead
from upsonic import Agent, Task

def streaming_rag(question):
    """RAG with streaming response using Agent"""
    # Retrieve context
    context = f"Context for: {question}"
    
    # Create agent
    agent = Agent("anthropic/claude-sonnet-4-5")
    
    # Build prompt with context
    prompt = f"Answer based on this context:\n\nContext: {context}\n\nQuestion: {question}"
    
    # Stream the response
    task = Task(prompt)
    result = agent.stream(task)
    
    async def stream_output():
        async with result:
            async for chunk in result.stream_output():
                print(chunk, end="", flush=True)
        print()  # New line after streaming
    
    import asyncio
    asyncio.run(stream_output())

streaming_rag("What is machine learning?")
# For non-streaming UEL chain, use regular invoke
from upsonic.uel import ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model

def non_streaming_rag(question):
    """RAG without streaming using UEL chain"""
    model = infer_model("anthropic/claude-sonnet-4-5")
    parser = StrOutputParser()
    context = f"Context for: {question}"
    
    chain = (
        ChatPromptTemplate.from_template(
            "Answer based on context: {context}\n\nQuestion: {question}"
        )
        | model
        | parser
    )
    
    result = chain.invoke({"context": context, "question": question})
    print(result)

non_streaming_rag("What is machine learning?")