Skip to main content

Overview

The @chain decorator allows you to create custom Runnable functions with full control over the execution flow. This is useful for complex multi-step processing that doesn’t fit cleanly into simple pipe chains.

Basic Usage

from upsonic.uel import chain, ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model

prompt1 = ChatPromptTemplate.from_template("Tell a joke about {topic}")
prompt2 = ChatPromptTemplate.from_template("Rate this joke on a scale of 1-10: {joke}")

@chain
def custom_chain(input_dict):
    """Multi-step processing with custom logic"""
    model = infer_model("openai/gpt-4o")
    parser = StrOutputParser()
    
    # Step 1: Generate joke
    joke_prompt = prompt1.invoke({"topic": input_dict["topic"]})
    joke_response = model.invoke(joke_prompt)
    joke = parser.parse(joke_response)
    
    # Step 2: Rate the joke
    rating_chain = prompt2 | model | parser
    rating = rating_chain.invoke({"joke": joke})
    
    return {
        "joke": joke,
        "rating": rating
    }

# Use like any other Runnable
result = custom_chain.invoke({"topic": "programming"})
print(f"Joke: {result['joke']}")
print(f"Rating: {result['rating']}")

Dynamic Chain Construction

from upsonic.uel import chain, ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model

@chain
def adaptive_chain(input_dict):
    """Return different chains based on input"""
    model = infer_model("openai/gpt-4o")
    parser = StrOutputParser()
    
    if input_dict.get("complex"):
        complex_prompt = ChatPromptTemplate.from_template(
            "Provide a detailed, comprehensive answer: {query}"
        )
        return complex_prompt | model | parser
    else:
        simple_prompt = ChatPromptTemplate.from_template(
            "Provide a brief answer: {query}"
        )
        return simple_prompt | model | parser

# The returned chain is automatically invoked
result = adaptive_chain.invoke({"query": "What is AI?", "complex": True})
print(result)

Multi-Stage Processing

from upsonic.uel import chain, ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model

@chain
def multi_stage_pipeline(input_dict):
    """Process through multiple stages"""
    model = infer_model("openai/gpt-4o")
    parser = StrOutputParser()
    text = input_dict["text"]
    
    # Stage 1: Extract entities
    extract_chain = (
        ChatPromptTemplate.from_template(
            "Extract key entities from this text: {text}\n\nList them as a comma-separated list."
        ) | model | parser
    )
    entities = extract_chain.invoke({"text": text})
    
    # Stage 2: Enrich with context
    enrich_chain = (
        ChatPromptTemplate.from_template(
            "Provide context for these entities: {entities}"
        ) | model | parser
    )
    context = enrich_chain.invoke({"entities": entities})
    
    # Stage 3: Generate summary
    summarize_chain = (
        ChatPromptTemplate.from_template(
            "Summarize:\nOriginal: {text}\nEntities: {entities}\nContext: {context}"
        ) | model | parser
    )
    summary = summarize_chain.invoke({
        "text": text,
        "entities": entities,
        "context": context
    })
    
    return {
        "entities": entities,
        "context": context,
        "summary": summary
    }

result = multi_stage_pipeline.invoke({
    "text": "Python is a programming language. It's used for web development, data science, and AI."
})
print(f"Entities: {result['entities']}")
print(f"Summary: {result['summary']}")

Async Custom Chains

import asyncio
from upsonic.uel import chain, ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model

@chain
async def async_custom_chain(input_dict):
    """Async custom chain"""
    model = infer_model("openai/gpt-4o")
    parser = StrOutputParser()
    
    # Async operations
    prompt1 = ChatPromptTemplate.from_template("First step: {input}")
    response1 = await (prompt1 | model | parser).ainvoke(input_dict)
    
    prompt2 = ChatPromptTemplate.from_template("Second step: {input}")
    response2 = await (prompt2 | model | parser).ainvoke({
        "input": response1
    })
    
    return {
        "step1": response1,
        "step2": response2
    }

async def main():
    result = await async_custom_chain.ainvoke({"input": "Hello"})
    print(result["step1"])
    print(result["step2"])

asyncio.run(main())

Error Handling in Custom Chains

from upsonic.uel import chain, ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model

@chain
def safe_chain(input_dict):
    """Chain with error handling"""
    model = infer_model("openai/gpt-4o")
    parser = StrOutputParser()
    
    try:
        prompt = ChatPromptTemplate.from_template("Process: {input}")
        result = (prompt | model | parser).invoke(input_dict)
        return {"result": result, "error": None}
    except Exception as e:
        return {
            "result": None,
            "error": str(e),
            "fallback": "Default response due to error"
        }

result = safe_chain.invoke({"input": "test"})
if result["error"]:
    print(f"Error: {result['error']}")
    print(f"Fallback: {result['fallback']}")
else:
    print(f"Result: {result['result']}")