Overview
RunnableParallel allows you to execute multiple chains simultaneously, collecting their results in a dictionary. This is useful for running independent operations concurrently to maximize performance.
Basic Usage
Using RunnableParallel Class
Copy
from upsonic.uel import ChatPromptTemplate, RunnableParallel, StrOutputParser
from upsonic.models import infer_model
model = infer_model("openai/gpt-4o")
parser = StrOutputParser()
# Create parallel chains
parallel = RunnableParallel(
joke=(ChatPromptTemplate.from_template("Tell a joke about {topic}") | model | parser),
poem=(ChatPromptTemplate.from_template("Write a short poem about {topic}") | model | parser),
fact=(ChatPromptTemplate.from_template("Share an interesting fact about {topic}") | model | parser)
)
# Execute in parallel
result = parallel.invoke({"topic": "ocean"})
# Access results
print(f"Joke: {result['joke']}")
print(f"Poem: {result['poem']}")
print(f"Fact: {result['fact']}")
Using Dict Syntax
Copy
# Dict syntax automatically creates RunnableParallel when used in chains with |
# For standalone use, explicitly create RunnableParallel
from upsonic.uel import ChatPromptTemplate, RunnableParallel, StrOutputParser
from upsonic.models import infer_model
model = infer_model("openai/gpt-4o")
parser = StrOutputParser()
parallel = RunnableParallel(
joke=(ChatPromptTemplate.from_template("Tell a joke about {topic}") | model | parser),
poem=(ChatPromptTemplate.from_template("Write a poem about {topic}") | model | parser)
)
result = parallel.invoke({"topic": "space"})
print(result["joke"])
print(result["poem"])
# Dict syntax works automatically when used in a chain
from upsonic.uel import RunnableLambda
chain = (
{
"joke": ChatPromptTemplate.from_template("Tell a joke about {topic}") | model | parser,
"poem": ChatPromptTemplate.from_template("Write a poem about {topic}") | model | parser
}
| ChatPromptTemplate.from_template("Combine joke and poem:\nJoke: {joke}\nPoem: {poem}")
| model
| parser
)
result = chain.invoke({"topic": "space"})
print(result)
Use Cases
RAG with Multiple Retrievers
Copy
from upsonic.uel import ChatPromptTemplate, StrOutputParser, itemgetter
from upsonic.models import infer_model
model = infer_model("openai/gpt-4o")
parser = StrOutputParser()
# Simulate retrievers (replace with actual retriever implementations)
def document_retriever(question):
return f"Document context for: {question}"
def web_search(question):
return f"Web results for: {question}"
def example_retriever(question):
return f"Example context for: {question}"
# Retrieve from multiple sources simultaneously
chain = (
{
"docs": itemgetter("question") | (lambda x: document_retriever(x)),
"web": itemgetter("question") | (lambda x: web_search(x)),
"examples": itemgetter("question") | (lambda x: example_retriever(x)),
"question": itemgetter("question")
}
| ChatPromptTemplate.from_template(
"Context: {docs}\nWeb: {web}\nExamples: {examples}\n\nQuestion: {question}"
)
| model
| parser
)
result = chain.invoke({"question": "What is machine learning?"})
print(result)
Parallel + Sequential Combination
Copy
from upsonic.uel import ChatPromptTemplate, StrOutputParser, itemgetter
from upsonic.models import infer_model
model = infer_model("openai/gpt-4o")
parser = StrOutputParser()
# Run retrievers in parallel, then process sequentially
chain = (
# Parallel retrieval
{
"local_docs": itemgetter("query") | (lambda x: f"Local docs: {x}"),
"web_results": itemgetter("query") | (lambda x: f"Web: {x}"),
"vector_db": itemgetter("query") | (lambda x: f"Vector: {x}"),
"query": itemgetter("query")
}
# Sequential processing
| ChatPromptTemplate.from_template(
"Answer based on:\nLocal: {local_docs}\nWeb: {web_results}\nVector: {vector_db}\n\nQuery: {query}"
)
| model
| parser
)
result = chain.invoke({"query": "Python best practices"})
print(result)
Async Parallel Execution
Copy
import asyncio
from upsonic.uel import ChatPromptTemplate, RunnableParallel, StrOutputParser
from upsonic.models import infer_model
async def main():
model = infer_model("openai/gpt-4o")
parser = StrOutputParser()
parallel = RunnableParallel(
joke=(ChatPromptTemplate.from_template("Tell a joke about {topic}") | model | parser),
poem=(ChatPromptTemplate.from_template("Write a poem about {topic}") | model | parser)
)
# Async execution
result = await parallel.ainvoke({"topic": "AI"})
print(result["joke"])
print(result["poem"])
asyncio.run(main())

