Overview
All UEL components support async/await, allowing you to execute chains asynchronously for better performance, especially when dealing with multiple chains or I/O-bound operations.Basic Async Usage
Copy
import asyncio
from upsonic.uel import ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model
async def main():
chain = (
ChatPromptTemplate.from_template("Tell me about {topic}")
| infer_model("openai/gpt-4o")
| StrOutputParser()
)
# Async invocation
result = await chain.ainvoke({"topic": "AI"})
print(result)
asyncio.run(main())
Parallel Async Operations
Copy
import asyncio
from upsonic.uel import ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model
async def main():
chain = (
ChatPromptTemplate.from_template("Explain {topic}")
| infer_model("openai/gpt-4o")
| StrOutputParser()
)
# Execute multiple chains in parallel
results = await asyncio.gather(
chain.ainvoke({"topic": "Python"}),
chain.ainvoke({"topic": "JavaScript"}),
chain.ainvoke({"topic": "Rust"})
)
for i, result in enumerate(results):
topics = ["Python", "JavaScript", "Rust"]
if result:
print(f"{topics[i]}: {result[:100]}...")
asyncio.run(main())
Async with RunnableParallel
Copy
import asyncio
from upsonic.uel import ChatPromptTemplate, RunnableParallel, StrOutputParser
from upsonic.models import infer_model
async def main():
model = infer_model("openai/gpt-4o")
parser = StrOutputParser()
parallel = RunnableParallel(
joke=(ChatPromptTemplate.from_template("Tell a joke about {topic}") | model | parser),
poem=(ChatPromptTemplate.from_template("Write a poem about {topic}") | model | parser),
fact=(ChatPromptTemplate.from_template("Share a fact about {topic}") | model | parser)
)
# Async parallel execution
result = await parallel.ainvoke({"topic": "ocean"})
print(f"Joke: {result['joke']}")
print(f"Poem: {result['poem']}")
print(f"Fact: {result['fact']}")
asyncio.run(main())
Async Custom Chains
Copy
import asyncio
from upsonic.uel import chain, ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model
@chain
async def async_multi_step(input_dict):
"""Async multi-step processing"""
model = infer_model("openai/gpt-4o")
parser = StrOutputParser()
# Step 1
prompt1 = ChatPromptTemplate.from_template("First: {input}")
result1 = await (prompt1 | model | parser).ainvoke(input_dict)
# Step 2 (depends on step 1)
prompt2 = ChatPromptTemplate.from_template("Second: {input}")
result2 = await (prompt2 | model | parser).ainvoke({
"input": result1 or ""
})
return {
"step1": result1 or "",
"step2": result2 or ""
}
async def main():
result = await async_multi_step.ainvoke({"input": "Hello"})
print(f"Step 1: {result['step1']}")
print(f"Step 2: {result['step2']}")
asyncio.run(main())
Async with Error Handling
Copy
import asyncio
from upsonic.uel import ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model
async def safe_async_chain(topic):
"""Async chain with error handling"""
try:
chain = (
ChatPromptTemplate.from_template("Tell me about {topic}")
| infer_model("openai/gpt-4o")
| StrOutputParser()
)
result = await chain.ainvoke({"topic": topic})
return {"success": True, "result": result or ""}
except Exception as e:
return {"success": False, "error": str(e)}
async def main():
results = await asyncio.gather(
safe_async_chain("Python"),
safe_async_chain("JavaScript"),
safe_async_chain("Rust"),
return_exceptions=True
)
for result in results:
if isinstance(result, dict) and result.get("success"):
print(f"Success: {result['result'][:50]}...")
else:
print(f"Error: {result}")
asyncio.run(main())
Async Batch Processing
Copy
import asyncio
from upsonic.uel import ChatPromptTemplate, StrOutputParser
from upsonic.models import infer_model
async def process_batch(topics):
"""Process multiple items asynchronously"""
chain = (
ChatPromptTemplate.from_template("Explain {topic} in one sentence")
| infer_model("openai/gpt-4o")
| StrOutputParser()
)
# Create tasks for all topics
tasks = [chain.ainvoke({"topic": topic}) for topic in topics]
# Execute all in parallel
results = await asyncio.gather(*tasks)
return [result or "" for result in results]
async def main():
topics = ["Python", "JavaScript", "Rust", "Go", "TypeScript"]
results = await process_batch(topics)
for topic, result in zip(topics, results):
print(f"{topic}: {result}")
asyncio.run(main())

