Skip to main content

Overview

RunnableParallel allows you to execute multiple chains simultaneously, collecting their results in a dictionary. This is useful for running independent operations concurrently to maximize performance.

Basic Usage

Using RunnableParallel Class

from upsonic.uel import ChatPromptTemplate, RunnableParallel, StrOutputParser
from upsonic.models import infer_model

model = infer_model("openai/gpt-4o")
parser = StrOutputParser()

# Create parallel chains
parallel = RunnableParallel(
    joke=(ChatPromptTemplate.from_template("Tell a joke about {topic}") | model | parser),
    poem=(ChatPromptTemplate.from_template("Write a short poem about {topic}") | model | parser),
    fact=(ChatPromptTemplate.from_template("Share an interesting fact about {topic}") | model | parser)
)

# Execute in parallel
result = parallel.invoke({"topic": "ocean"})

# Access results
print(f"Joke: {result['joke']}")
print(f"Poem: {result['poem']}")
print(f"Fact: {result['fact']}")

Using Dict Syntax

# Dict syntax automatically creates RunnableParallel when used in chains with |
# For standalone use, explicitly create RunnableParallel
from upsonic.uel import ChatPromptTemplate, RunnableParallel, StrOutputParser
from upsonic.models import infer_model

model = infer_model("openai/gpt-4o")
parser = StrOutputParser()

parallel = RunnableParallel(
    joke=(ChatPromptTemplate.from_template("Tell a joke about {topic}") | model | parser),
    poem=(ChatPromptTemplate.from_template("Write a poem about {topic}") | model | parser)
)

result = parallel.invoke({"topic": "space"})
print(result["joke"])
print(result["poem"])

# Dict syntax works automatically when used in a chain
from upsonic.uel import RunnableLambda

chain = (
    {
        "joke": ChatPromptTemplate.from_template("Tell a joke about {topic}") | model | parser,
        "poem": ChatPromptTemplate.from_template("Write a poem about {topic}") | model | parser
    }
    | ChatPromptTemplate.from_template("Combine joke and poem:\nJoke: {joke}\nPoem: {poem}")
    | model
    | parser
)

result = chain.invoke({"topic": "space"})
print(result)

Use Cases

RAG with Multiple Retrievers

from upsonic.uel import ChatPromptTemplate, StrOutputParser, itemgetter
from upsonic.models import infer_model

model = infer_model("openai/gpt-4o")
parser = StrOutputParser()

# Simulate retrievers (replace with actual retriever implementations)
def document_retriever(question):
    return f"Document context for: {question}"

def web_search(question):
    return f"Web results for: {question}"

def example_retriever(question):
    return f"Example context for: {question}"

# Retrieve from multiple sources simultaneously
chain = (
    {
        "docs": itemgetter("question") | (lambda x: document_retriever(x)),
        "web": itemgetter("question") | (lambda x: web_search(x)),
        "examples": itemgetter("question") | (lambda x: example_retriever(x)),
        "question": itemgetter("question")
    }
    | ChatPromptTemplate.from_template(
        "Context: {docs}\nWeb: {web}\nExamples: {examples}\n\nQuestion: {question}"
    )
    | model
    | parser
)

result = chain.invoke({"question": "What is machine learning?"})
print(result)

Parallel + Sequential Combination

from upsonic.uel import ChatPromptTemplate, StrOutputParser, itemgetter
from upsonic.models import infer_model

model = infer_model("openai/gpt-4o")
parser = StrOutputParser()

# Run retrievers in parallel, then process sequentially
chain = (
    # Parallel retrieval
    {
        "local_docs": itemgetter("query") | (lambda x: f"Local docs: {x}"),
        "web_results": itemgetter("query") | (lambda x: f"Web: {x}"),
        "vector_db": itemgetter("query") | (lambda x: f"Vector: {x}"),
        "query": itemgetter("query")
    }
    # Sequential processing
    | ChatPromptTemplate.from_template(
        "Answer based on:\nLocal: {local_docs}\nWeb: {web_results}\nVector: {vector_db}\n\nQuery: {query}"
    )
    | model
    | parser
)

result = chain.invoke({"query": "Python best practices"})
print(result)

Async Parallel Execution

import asyncio
from upsonic.uel import ChatPromptTemplate, RunnableParallel, StrOutputParser
from upsonic.models import infer_model

async def main():
    model = infer_model("openai/gpt-4o")
    parser = StrOutputParser()
    
    parallel = RunnableParallel(
        joke=(ChatPromptTemplate.from_template("Tell a joke about {topic}") | model | parser),
        poem=(ChatPromptTemplate.from_template("Write a poem about {topic}") | model | parser)
    )
    
    # Async execution
    result = await parallel.ainvoke({"topic": "AI"})
    print(result["joke"])
    print(result["poem"])

asyncio.run(main())