States
States are the data that flows through your graph. They’re defined as TypedDict for type safety and clarity.
Basic State Definition
from typing_extensions import TypedDict
class MyState(TypedDict):
input: str
output: str
count: int
State Reducers
Reducers control how state updates are merged. By default, new values replace old ones, but you can specify custom merge behavior:
from typing import Annotated, List
import operator
class RichState(TypedDict):
# Reducer: operator.add - new items append to list
messages: Annotated[List[str], operator.add]
# Reducer: sum - numeric values add together
total_cost: Annotated[float, lambda a, b: a + b]
# Reducer: max - keep the highest value
max_score: Annotated[int, max]
# No reducer - new value replaces old (default)
current_step: str
Common Reducer Patterns:
| Reducer | Behavior | Use Case |
|---|
operator.add | Append/concatenate | Lists, strings |
lambda a, b: a + b | Sum values | Counters, totals |
max | Keep maximum | Scores, priorities |
min | Keep minimum | Costs, distances |
lambda a, b: {**a, **b} | Merge dicts | Nested objects |
Example: Message History
from typing import Annotated, List
import operator
class ChatState(TypedDict):
messages: Annotated[List[str], operator.add]
turn_count: Annotated[int, lambda a, b: a + b]
topic: str
# Node updates
def node1(state: ChatState) -> dict:
return {
"messages": ["Hello"], # Appends to list
"turn_count": 1, # Adds to counter
"topic": "greeting" # Replaces old value
}
def node2(state: ChatState) -> dict:
return {
"messages": ["How are you?"], # Appends
"turn_count": 1 # Adds 1
}
# After both nodes:
# messages = ["Hello", "How are you?"]
# turn_count = 2
# topic = "greeting"
Control what goes in and out of your graph:
class InputState(TypedDict):
question: str
class OutputState(TypedDict):
answer: str
class InternalState(InputState, OutputState):
# Internal fields not exposed
processing_steps: int
intermediate_data: dict
# Only accept 'question' as input, only return 'answer'
builder = StateGraph(
InternalState,
input_schema=InputState,
output_schema=OutputState
)
Schema Validation: Input schema validates required fields at runtime. Output schema filters the final state before returning.
Nodes
Nodes are functions that process state. They receive the current state and return updates.
Basic Node
def my_node(state: MyState) -> dict:
"""Process state and return updates."""
result = do_something(state["input"])
return {
"output": result,
"count": state["count"] + 1
}
Node Signatures
Nodes can have different signatures:
# 1. State only
def node(state: MyState) -> dict:
return {"output": "done"}
# 2. State + config (access runtime context)
def node(state: MyState, config: dict) -> dict:
context = config.get("context", {})
model_name = context.get("model", "default")
return {"output": f"Using {model_name}"}
# 3. Returning Command for explicit routing
from upsonic.graphv2 import Command
def node(state: MyState) -> Command:
if state["count"] > 10:
return Command(
update={"status": "complete"},
goto=END
)
return Command(
update={"count": state["count"] + 1},
goto="process"
)
Node Return Types
Nodes can return:
- Dictionary - State updates (merged with reducers)
- Command - State updates + routing
- Send - Dynamic parallel invocation
- List[Send] - Multiple parallel invocations
# Return dict
def simple_node(state) -> dict:
return {"field": "value"}
# Return Command
def routing_node(state) -> Command:
return Command(update={"field": "value"}, goto="next_node")
# Return Send
def orchestrator(state) -> List[Send]:
return [
Send("worker", {"item": item})
for item in state["items"]
]
Adding Nodes
builder = StateGraph(MyState)
# Basic node
builder.add_node("process", process_node)
# Node with retry policy
from upsonic.graphv2 import RetryPolicy
builder.add_node(
"api_call",
call_api,
retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)
# Node with cache policy
from upsonic.graphv2 import CachePolicy
builder.add_node(
"expensive",
expensive_computation,
cache_policy=CachePolicy(ttl=300) # Cache for 5 minutes
)
Node names must be unique and can’t be START or END (these are reserved).
Edges
Edges define the flow of execution between nodes.
Simple Edges
Direct connections from one node to another:
builder.add_edge(START, "first_node")
builder.add_edge("first_node", "second_node")
builder.add_edge("second_node", END)
Conditional Edges
Branch based on state:
def route_by_intent(state: MyState) -> str:
"""Return the name of the next node."""
if state["intent"] == "question":
return "answer"
elif state["intent"] == "command":
return "execute"
else:
return "fallback"
builder.add_conditional_edges(
"classify", # From node
route_by_intent, # Routing function
["answer", "execute", "fallback"] # Possible targets
)
# Still need to connect the target nodes to something
builder.add_edge("answer", END)
builder.add_edge("execute", END)
builder.add_edge("fallback", END)
All nodes mentioned in targets must be connected to other nodes or END. The graph validator will catch missing connections.
Dynamic Edges with Send
For orchestrator-worker patterns:
from upsonic.graphv2 import Send
def fan_out(state: MyState) -> List[Send]:
"""Create parallel workers."""
return [
Send("worker", {"item": item, "index": i})
for i, item in enumerate(state["items"])
]
builder.add_conditional_edges(
"orchestrator",
fan_out,
["worker"] # Worker node receives Send objects
)
Conditional Routing
Conditional routing lets you make dynamic decisions about execution flow.
Method 1: Conditional Edges
Use add_conditional_edges with a routing function:
class State(TypedDict):
score: int
result: str
def route_by_score(state: State) -> str:
if state["score"] >= 90:
return "excellent"
elif state["score"] >= 70:
return "good"
else:
return "needs_improvement"
builder = StateGraph(State)
builder.add_node("evaluate", evaluate_node)
builder.add_node("excellent", excellent_handler)
builder.add_node("good", good_handler)
builder.add_node("needs_improvement", needs_improvement_handler)
builder.add_edge(START, "evaluate")
builder.add_conditional_edges(
"evaluate",
route_by_score,
["excellent", "good", "needs_improvement"]
)
builder.add_edge("excellent", END)
builder.add_edge("good", END)
builder.add_edge("needs_improvement", END)
graph = builder.compile()
Method 2: Command Objects
Nodes return Command to control routing:
from upsonic.graphv2 import Command
from typing import Literal
def evaluate_node(state: State) -> Command[Literal["excellent", "good", "needs_improvement"]]:
score = calculate_score(state)
if score >= 90:
return Command(
update={"score": score, "result": "excellent"},
goto="excellent"
)
elif score >= 70:
return Command(
update={"score": score, "result": "good"},
goto="good"
)
else:
return Command(
update={"score": score, "result": "needs_improvement"},
goto="needs_improvement"
)
# With Command, you don't need add_conditional_edges
# Just add regular edges from the target nodes
builder.add_edge(START, "evaluate")
builder.add_edge("excellent", END)
builder.add_edge("good", END)
builder.add_edge("needs_improvement", END)
Command vs Conditional Edges:
- Use Command when the node itself decides routing (more explicit)
- Use conditional_edges when routing logic should be separate from node logic
Loops and Cycles
Create loops by routing back to earlier nodes:
from upsonic.graphv2 import Command, END
class LoopState(TypedDict):
counter: int
max_iterations: int
result: str
def loop_node(state: LoopState) -> Command:
new_counter = state["counter"] + 1
if new_counter >= state["max_iterations"]:
return Command(
update={"counter": new_counter, "result": "done"},
goto=END
)
else:
return Command(
update={"counter": new_counter},
goto="loop_node" # Loop back to self
)
builder = StateGraph(LoopState)
builder.add_node("loop_node", loop_node)
builder.add_edge(START, "loop_node")
graph = builder.compile()
result = graph.invoke(
{"counter": 0, "max_iterations": 5, "result": ""},
config={"recursion_limit": 10} # Prevent infinite loops
)
Recursion Limits: Always set a recursion_limit when using loops to prevent infinite execution. Default is 100 steps.
Parallel Execution
Execute multiple nodes in parallel when they have no dependencies:
class State(TypedDict):
results_a: List[str]
results_b: List[str]
builder = StateGraph(State)
builder.add_node("setup", setup_node)
builder.add_node("parallel_a", process_a)
builder.add_node("parallel_b", process_b)
builder.add_node("merge", merge_results)
# Both parallel_a and parallel_b will execute concurrently
builder.add_edge(START, "setup")
builder.add_edge("setup", "parallel_a")
builder.add_edge("setup", "parallel_b")
builder.add_edge("parallel_a", "merge")
builder.add_edge("parallel_b", "merge")
builder.add_edge("merge", END)
graph = builder.compile()
Parallel execution is automatic when multiple nodes have the same parent and no dependencies on each other.
Runtime Configuration
Pass runtime context to nodes via the context parameter:
def node_with_context(state: MyState, config: dict) -> dict:
context = config.get("context", {})
model_name = context.get("model", "default")
temperature = context.get("temperature", 0.7)
# Use runtime config
model = infer_model(model_name)
return {"output": model.invoke(state["input"])}
# Pass context at runtime
result = graph.invoke(
initial_state,
context={
"model": "openai/gpt-4o",
"temperature": 0.9
}
)
Best Practices
1. Keep Nodes Focused
Each node should have a single responsibility:
# ❌ Bad - doing too much
def big_node(state):
# Fetch data
# Process data
# Validate
# Store results
# Send notifications
pass
# ✅ Good - focused nodes
def fetch_data(state): ...
def process_data(state): ...
def validate_results(state): ...
def store_results(state): ...
def send_notifications(state): ...
2. Use Type Hints
Always type your state and return values:
# ✅ Good
def process(state: MyState) -> dict:
return {"output": "result"}
# ❌ Bad
def process(state):
return {"output": "result"}
3. Design State Carefully
Think about what needs to be in state vs what can be computed:
# ❌ Bad - storing derived data
class State(TypedDict):
items: List[str]
item_count: int # Can be computed from items
# ✅ Good - only essential data
class State(TypedDict):
items: List[str]
# Compute count when needed: len(state["items"])
4. Use Command for Complex Routing
When routing logic is complex, use Command to make it explicit:
def smart_router(state: MyState) -> Command:
# Complex decision logic
if complex_condition(state):
return Command(
update={"status": "path_a"},
goto="path_a"
)
return Command(
update={"status": "path_b"},
goto="path_b"
)
Next Steps