Skip to main content

Overview

StateGraph provides advanced features for complex workflows:
  • πŸ”€ Send API - Dynamic parallel execution (orchestrator-worker pattern)
  • ⚑ Parallel Nodes - Automatic concurrent execution
  • 🎯 Task Decorators - Durable functions with retry and cache

Send API

The Send API enables dynamic parallelization where a node can spawn multiple worker instances that execute concurrently.

Basic Send Pattern

from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, Send

class State(TypedDict):
    items: List[str]
    results: Annotated[List[str], operator.add]

def orchestrator(state: State) -> dict:
    """Prepare items for processing."""
    return {"items": state["items"]}

def fan_out(state: State) -> List[Send]:
    """Create a worker for each item."""
    return [
        Send("worker", {"item": item})
        for item in state["items"]
    ]

def worker(state: dict) -> dict:
    """Process a single item."""
    item = state["item"]
    result = f"processed_{item}"
    return {"results": [result]}

# Build graph
builder = StateGraph(State)
builder.add_node("orchestrator", orchestrator)
builder.add_node("worker", worker)

builder.add_edge(START, "orchestrator")
builder.add_conditional_edges("orchestrator", fan_out, ["worker"])
builder.add_edge("worker", END)

graph = builder.compile()

# Execute
result = graph.invoke({
    "items": ["a", "b", "c"],
    "results": []
})

print(result["results"])  # ['processed_a', 'processed_b', 'processed_c']
Workers execute in parallel and their results are automatically merged using reducers.

Map-Reduce Pattern

class MapReduceState(TypedDict):
    data: List[int]
    mapped: Annotated[List[int], operator.add]
    reduced: int

def map_phase(state: MapReduceState) -> List[Send]:
    """Map: send each item to a worker."""
    return [
        Send("mapper", {"value": val})
        for val in state["data"]
    ]

def mapper(state: dict) -> dict:
    """Map function: square the value."""
    squared = state["value"] ** 2
    return {"mapped": [squared]}

def reduce_phase(state: MapReduceState) -> dict:
    """Reduce: sum all mapped values."""
    total = sum(state["mapped"])
    return {"reduced": total}

builder = StateGraph(MapReduceState)
builder.add_node("start", lambda s: {"data": s["data"]})
builder.add_node("mapper", mapper)
builder.add_node("reduce", reduce_phase)

builder.add_edge(START, "start")
builder.add_conditional_edges("start", map_phase, ["mapper"])
builder.add_edge("mapper", "reduce")
builder.add_edge("reduce", END)

graph = builder.compile()

result = graph.invoke({
    "data": [1, 2, 3, 4, 5],
    "mapped": [],
    "reduced": 0
})

print(f"Sum of squares: {result['reduced']}")  # 1+4+9+16+25 = 55

Parallel Node Execution

StateGraph automatically executes nodes in parallel when they have no dependencies:
from typing import Annotated, List
import operator

class ParallelState(TypedDict):
    input: str
    results_a: Annotated[List[str], operator.add]
    results_b: Annotated[List[str], operator.add]
    final: str

def setup(state: ParallelState) -> dict:
    return {"input": state["input"]}

def process_a(state: ParallelState) -> dict:
    result = f"A processed: {state['input']}"
    return {"results_a": [result]}

def process_b(state: ParallelState) -> dict:
    result = f"B processed: {state['input']}"
    return {"results_b": [result]}

def merge(state: ParallelState) -> dict:
    combined = state['results_a'] + state['results_b']
    return {"final": ", ".join(combined)}

builder = StateGraph(ParallelState)
builder.add_node("setup", setup)
builder.add_node("process_a", process_a)
builder.add_node("process_b", process_b)
builder.add_node("merge", merge)

builder.add_edge(START, "setup")
builder.add_edge("setup", "process_a")
builder.add_edge("setup", "process_b")
builder.add_edge("process_a", "merge")
builder.add_edge("process_b", "merge")
builder.add_edge("merge", END)

graph = builder.compile()

result = graph.invoke({
    "input": "test data",
    "results_a": [],
    "results_b": [],
    "final": ""
})

print(result["final"])
Automatic Parallelization: When multiple nodes have the same parent and don’t depend on each other, they execute concurrently.

Task Decorator

The @task decorator creates durable functions with built-in retry and caching:

Basic Task

from upsonic.graphv2 import task

@task
def expensive_computation(x: int) -> int:
    """A function decorated as a task."""
    import time
    time.sleep(0.1)  # Simulate work
    return x * 2

# Use in a node
def my_node(state: MyState) -> dict:
    result = expensive_computation(state["value"]).result()
    return {"output": result}

Task with Retry and Cache

from upsonic.graphv2 import task, RetryPolicy, CachePolicy, InMemoryCache

@task(
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
    cache_policy=CachePolicy(ttl=300)
)
def robust_expensive_call(param: str) -> str:
    """Retries on failure, caches on success."""
    # Simulate API call
    if param == "fail":
        raise ConnectionError("Simulated failure")
    return f"Result for {param}"

# Provide cache to graph
cache = InMemoryCache()
graph = builder.compile(cache=cache)

Next Steps