Overview
StateGraph provides advanced features for complex workflows:
- π Send API - Dynamic parallel execution (orchestrator-worker pattern)
- β‘ Parallel Nodes - Automatic concurrent execution
- π― Task Decorators - Durable functions with retry and cache
Send API
The Send API enables dynamic parallelization where a node can spawn multiple worker instances that execute concurrently.
Basic Send Pattern
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, Send
class State(TypedDict):
items: List[str]
results: Annotated[List[str], operator.add]
def orchestrator(state: State) -> dict:
"""Prepare items for processing."""
return {"items": state["items"]}
def fan_out(state: State) -> List[Send]:
"""Create a worker for each item."""
return [
Send("worker", {"item": item})
for item in state["items"]
]
def worker(state: dict) -> dict:
"""Process a single item."""
item = state["item"]
result = f"processed_{item}"
return {"results": [result]}
# Build graph
builder = StateGraph(State)
builder.add_node("orchestrator", orchestrator)
builder.add_node("worker", worker)
builder.add_edge(START, "orchestrator")
builder.add_conditional_edges("orchestrator", fan_out, ["worker"])
builder.add_edge("worker", END)
graph = builder.compile()
# Execute
result = graph.invoke({
"items": ["a", "b", "c"],
"results": []
})
print(result["results"]) # ['processed_a', 'processed_b', 'processed_c']
Workers execute in parallel and their results are automatically merged using reducers.
Map-Reduce Pattern
class MapReduceState(TypedDict):
data: List[int]
mapped: Annotated[List[int], operator.add]
reduced: int
def map_phase(state: MapReduceState) -> List[Send]:
"""Map: send each item to a worker."""
return [
Send("mapper", {"value": val})
for val in state["data"]
]
def mapper(state: dict) -> dict:
"""Map function: square the value."""
squared = state["value"] ** 2
return {"mapped": [squared]}
def reduce_phase(state: MapReduceState) -> dict:
"""Reduce: sum all mapped values."""
total = sum(state["mapped"])
return {"reduced": total}
builder = StateGraph(MapReduceState)
builder.add_node("start", lambda s: {"data": s["data"]})
builder.add_node("mapper", mapper)
builder.add_node("reduce", reduce_phase)
builder.add_edge(START, "start")
builder.add_conditional_edges("start", map_phase, ["mapper"])
builder.add_edge("mapper", "reduce")
builder.add_edge("reduce", END)
graph = builder.compile()
result = graph.invoke({
"data": [1, 2, 3, 4, 5],
"mapped": [],
"reduced": 0
})
print(f"Sum of squares: {result['reduced']}") # 1+4+9+16+25 = 55
Parallel Node Execution
StateGraph automatically executes nodes in parallel when they have no dependencies:
from typing import Annotated, List
import operator
class ParallelState(TypedDict):
input: str
results_a: Annotated[List[str], operator.add]
results_b: Annotated[List[str], operator.add]
final: str
def setup(state: ParallelState) -> dict:
return {"input": state["input"]}
def process_a(state: ParallelState) -> dict:
result = f"A processed: {state['input']}"
return {"results_a": [result]}
def process_b(state: ParallelState) -> dict:
result = f"B processed: {state['input']}"
return {"results_b": [result]}
def merge(state: ParallelState) -> dict:
combined = state['results_a'] + state['results_b']
return {"final": ", ".join(combined)}
builder = StateGraph(ParallelState)
builder.add_node("setup", setup)
builder.add_node("process_a", process_a)
builder.add_node("process_b", process_b)
builder.add_node("merge", merge)
builder.add_edge(START, "setup")
builder.add_edge("setup", "process_a")
builder.add_edge("setup", "process_b")
builder.add_edge("process_a", "merge")
builder.add_edge("process_b", "merge")
builder.add_edge("merge", END)
graph = builder.compile()
result = graph.invoke({
"input": "test data",
"results_a": [],
"results_b": [],
"final": ""
})
print(result["final"])
Automatic Parallelization: When multiple nodes have the same parent and donβt depend on each other, they execute concurrently.
Task Decorator
The @task decorator creates durable functions with built-in retry and caching:
Basic Task
from upsonic.graphv2 import task
@task
def expensive_computation(x: int) -> int:
"""A function decorated as a task."""
import time
time.sleep(0.1) # Simulate work
return x * 2
# Use in a node
def my_node(state: MyState) -> dict:
result = expensive_computation(state["value"]).result()
return {"output": result}
Task with Retry and Cache
from upsonic.graphv2 import task, RetryPolicy, CachePolicy, InMemoryCache
@task(
retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
cache_policy=CachePolicy(ttl=300)
)
def robust_expensive_call(param: str) -> str:
"""Retries on failure, caches on success."""
# Simulate API call
if param == "fail":
raise ConnectionError("Simulated failure")
return f"Result for {param}"
# Provide cache to graph
cache = InMemoryCache()
graph = builder.compile(cache=cache)
Next Steps