Skip to main content

States

States are the data that flows through your graph. They’re defined as TypedDict for type safety and clarity.

Basic State Definition

from typing_extensions import TypedDict

class MyState(TypedDict):
    input: str
    output: str
    count: int

State Reducers

Reducers control how state updates are merged. By default, new values replace old ones, but you can specify custom merge behavior:
from typing import Annotated, List
import operator

class RichState(TypedDict):
    # Reducer: operator.add - new items append to list
    messages: Annotated[List[str], operator.add]
    
    # Reducer: sum - numeric values add together
    total_cost: Annotated[float, lambda a, b: a + b]
    
    # Reducer: max - keep the highest value
    max_score: Annotated[int, max]
    
    # No reducer - new value replaces old (default)
    current_step: str
Common Reducer Patterns:
ReducerBehaviorUse Case
operator.addAppend/concatenateLists, strings
lambda a, b: a + bSum valuesCounters, totals
maxKeep maximumScores, priorities
minKeep minimumCosts, distances
lambda a, b: {**a, **b}Merge dictsNested objects

Example: Message History

from typing import Annotated, List
import operator

class ChatState(TypedDict):
    messages: Annotated[List[str], operator.add]
    turn_count: Annotated[int, lambda a, b: a + b]
    topic: str

# Node updates
def node1(state: ChatState) -> dict:
    return {
        "messages": ["Hello"],  # Appends to list
        "turn_count": 1,        # Adds to counter
        "topic": "greeting"     # Replaces old value
    }

def node2(state: ChatState) -> dict:
    return {
        "messages": ["How are you?"],  # Appends
        "turn_count": 1                # Adds 1
    }

# After both nodes:
# messages = ["Hello", "How are you?"]
# turn_count = 2
# topic = "greeting"

Input and Output Schemas

Control what goes in and out of your graph:
class InputState(TypedDict):
    question: str

class OutputState(TypedDict):
    answer: str

class InternalState(InputState, OutputState):
    # Internal fields not exposed
    processing_steps: int
    intermediate_data: dict

# Only accept 'question' as input, only return 'answer'
builder = StateGraph(
    InternalState,
    input_schema=InputState,
    output_schema=OutputState
)
Schema Validation: Input schema validates required fields at runtime. Output schema filters the final state before returning.

Nodes

Nodes are functions that process state. They receive the current state and return updates.

Basic Node

def my_node(state: MyState) -> dict:
    """Process state and return updates."""
    result = do_something(state["input"])
    return {
        "output": result,
        "count": state["count"] + 1
    }

Node Signatures

Nodes can have different signatures:
# 1. State only
def node(state: MyState) -> dict:
    return {"output": "done"}

# 2. State + config (access runtime context)
def node(state: MyState, config: dict) -> dict:
    context = config.get("context", {})
    model_name = context.get("model", "default")
    return {"output": f"Using {model_name}"}

# 3. Returning Command for explicit routing
from upsonic.graphv2 import Command

def node(state: MyState) -> Command:
    if state["count"] > 10:
        return Command(
            update={"status": "complete"},
            goto=END
        )
    return Command(
        update={"count": state["count"] + 1},
        goto="process"
    )

Node Return Types

Nodes can return:
  1. Dictionary - State updates (merged with reducers)
  2. Command - State updates + routing
  3. Send - Dynamic parallel invocation
  4. List[Send] - Multiple parallel invocations
# Return dict
def simple_node(state) -> dict:
    return {"field": "value"}

# Return Command
def routing_node(state) -> Command:
    return Command(update={"field": "value"}, goto="next_node")

# Return Send
def orchestrator(state) -> List[Send]:
    return [
        Send("worker", {"item": item})
        for item in state["items"]
    ]

Adding Nodes

builder = StateGraph(MyState)

# Basic node
builder.add_node("process", process_node)

# Node with retry policy
from upsonic.graphv2 import RetryPolicy

builder.add_node(
    "api_call",
    call_api,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)

# Node with cache policy
from upsonic.graphv2 import CachePolicy

builder.add_node(
    "expensive",
    expensive_computation,
    cache_policy=CachePolicy(ttl=300)  # Cache for 5 minutes
)
Node names must be unique and can’t be START or END (these are reserved).

Edges

Edges define the flow of execution between nodes.

Simple Edges

Direct connections from one node to another:
builder.add_edge(START, "first_node")
builder.add_edge("first_node", "second_node")
builder.add_edge("second_node", END)

Conditional Edges

Branch based on state:
def route_by_intent(state: MyState) -> str:
    """Return the name of the next node."""
    if state["intent"] == "question":
        return "answer"
    elif state["intent"] == "command":
        return "execute"
    else:
        return "fallback"

builder.add_conditional_edges(
    "classify",           # From node
    route_by_intent,      # Routing function
    ["answer", "execute", "fallback"]  # Possible targets
)

# Still need to connect the target nodes to something
builder.add_edge("answer", END)
builder.add_edge("execute", END)
builder.add_edge("fallback", END)
All nodes mentioned in targets must be connected to other nodes or END. The graph validator will catch missing connections.

Dynamic Edges with Send

For orchestrator-worker patterns:
from upsonic.graphv2 import Send

def fan_out(state: MyState) -> List[Send]:
    """Create parallel workers."""
    return [
        Send("worker", {"item": item, "index": i})
        for i, item in enumerate(state["items"])
    ]

builder.add_conditional_edges(
    "orchestrator",
    fan_out,
    ["worker"]  # Worker node receives Send objects
)

Conditional Routing

Conditional routing lets you make dynamic decisions about execution flow.

Method 1: Conditional Edges

Use add_conditional_edges with a routing function:
class State(TypedDict):
    score: int
    result: str

def route_by_score(state: State) -> str:
    if state["score"] >= 90:
        return "excellent"
    elif state["score"] >= 70:
        return "good"
    else:
        return "needs_improvement"

builder = StateGraph(State)
builder.add_node("evaluate", evaluate_node)
builder.add_node("excellent", excellent_handler)
builder.add_node("good", good_handler)
builder.add_node("needs_improvement", needs_improvement_handler)

builder.add_edge(START, "evaluate")
builder.add_conditional_edges(
    "evaluate",
    route_by_score,
    ["excellent", "good", "needs_improvement"]
)
builder.add_edge("excellent", END)
builder.add_edge("good", END)
builder.add_edge("needs_improvement", END)

graph = builder.compile()

Method 2: Command Objects

Nodes return Command to control routing:
from upsonic.graphv2 import Command
from typing import Literal

def evaluate_node(state: State) -> Command[Literal["excellent", "good", "needs_improvement"]]:
    score = calculate_score(state)
    
    if score >= 90:
        return Command(
            update={"score": score, "result": "excellent"},
            goto="excellent"
        )
    elif score >= 70:
        return Command(
            update={"score": score, "result": "good"},
            goto="good"
        )
    else:
        return Command(
            update={"score": score, "result": "needs_improvement"},
            goto="needs_improvement"
        )

# With Command, you don't need add_conditional_edges
# Just add regular edges from the target nodes
builder.add_edge(START, "evaluate")
builder.add_edge("excellent", END)
builder.add_edge("good", END)
builder.add_edge("needs_improvement", END)
Command vs Conditional Edges:
  • Use Command when the node itself decides routing (more explicit)
  • Use conditional_edges when routing logic should be separate from node logic

Loops and Cycles

Create loops by routing back to earlier nodes:
from upsonic.graphv2 import Command, END

class LoopState(TypedDict):
    counter: int
    max_iterations: int
    result: str

def loop_node(state: LoopState) -> Command:
    new_counter = state["counter"] + 1
    
    if new_counter >= state["max_iterations"]:
        return Command(
            update={"counter": new_counter, "result": "done"},
            goto=END
        )
    else:
        return Command(
            update={"counter": new_counter},
            goto="loop_node"  # Loop back to self
        )

builder = StateGraph(LoopState)
builder.add_node("loop_node", loop_node)
builder.add_edge(START, "loop_node")

graph = builder.compile()

result = graph.invoke(
    {"counter": 0, "max_iterations": 5, "result": ""},
    config={"recursion_limit": 10}  # Prevent infinite loops
)
Recursion Limits: Always set a recursion_limit when using loops to prevent infinite execution. Default is 100 steps.

Parallel Execution

Execute multiple nodes in parallel when they have no dependencies:
class State(TypedDict):
    results_a: List[str]
    results_b: List[str]

builder = StateGraph(State)
builder.add_node("setup", setup_node)
builder.add_node("parallel_a", process_a)
builder.add_node("parallel_b", process_b)
builder.add_node("merge", merge_results)

# Both parallel_a and parallel_b will execute concurrently
builder.add_edge(START, "setup")
builder.add_edge("setup", "parallel_a")
builder.add_edge("setup", "parallel_b")
builder.add_edge("parallel_a", "merge")
builder.add_edge("parallel_b", "merge")
builder.add_edge("merge", END)

graph = builder.compile()
Parallel execution is automatic when multiple nodes have the same parent and no dependencies on each other.

Runtime Configuration

Pass runtime context to nodes via the context parameter:
def node_with_context(state: MyState, config: dict) -> dict:
    context = config.get("context", {})
    model_name = context.get("model", "default")
    temperature = context.get("temperature", 0.7)
    
    # Use runtime config
    model = infer_model(model_name)
    return {"output": model.invoke(state["input"])}

# Pass context at runtime
result = graph.invoke(
    initial_state,
    context={
        "model": "openai/gpt-4o",
        "temperature": 0.9
    }
)

Best Practices

1. Keep Nodes Focused

Each node should have a single responsibility:
# ❌ Bad - doing too much
def big_node(state):
    # Fetch data
    # Process data
    # Validate
    # Store results
    # Send notifications
    pass

# ✅ Good - focused nodes
def fetch_data(state): ...
def process_data(state): ...
def validate_results(state): ...
def store_results(state): ...
def send_notifications(state): ...

2. Use Type Hints

Always type your state and return values:
# ✅ Good
def process(state: MyState) -> dict:
    return {"output": "result"}

# ❌ Bad
def process(state):
    return {"output": "result"}

3. Design State Carefully

Think about what needs to be in state vs what can be computed:
# ❌ Bad - storing derived data
class State(TypedDict):
    items: List[str]
    item_count: int  # Can be computed from items
    
# ✅ Good - only essential data
class State(TypedDict):
    items: List[str]
    # Compute count when needed: len(state["items"])

4. Use Command for Complex Routing

When routing logic is complex, use Command to make it explicit:
def smart_router(state: MyState) -> Command:
    # Complex decision logic
    if complex_condition(state):
        return Command(
            update={"status": "path_a"},
            goto="path_a"
        )
    return Command(
        update={"status": "path_b"},
        goto="path_b"
    )

Next Steps