Build complex, stateful AI workflows with graph-based orchestration.
The StateGraph class is a powerful graph-based workflow engine that enables you to build complex, stateful AI applications with explicit control flow. Instead of writing monolithic functions or brittle chains, you define workflows as graphs of nodes that can branch, loop, persist state, and recover from failures.
Overview
StateGraph can be created with minimal configuration or with extensive customization to suit your specific needs. The graph provides a robust foundation for AI-powered applications with built-in support for various advanced features.
Key Features
- Explicit Control Flow - Define workflows as visual graphs with nodes and edges
- Persistent State - Automatic checkpointing and state management across executions
- Time Travel - Access any historical state and fork execution timelines
- Human-in-the-Loop - Pause execution for human review and approval
- Built-in Reliability - Automatic retries, caching, and durability modes
- Dynamic Parallelization - Send API for orchestrator-worker patterns
- Recovery - Resume from failures without starting over
- Multi-Model Support - Works with OpenAI, Anthropic, Google, and others!
Example
Basic Example
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END
class ConversationState(TypedDict):
messages: list
response: str
def process_node(state: ConversationState) -> dict:
"""Process the user's message."""
user_message = state["messages"][-1]
response = f"Echo: {user_message}"
return {"response": response}
# Create and build the graph
builder = StateGraph(ConversationState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()
# Execute the graph
result = graph.invoke({
"messages": ["Hello!"],
"response": ""
})
print(result["response"]) # Output: Echo: Hello!
When creating a graph without specifying a checkpointer, state is not persisted between executions. Use MemorySaver or SqliteCheckpointer for persistence.
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END
from upsonic.models import infer_model
from upsonic.messages import ModelRequest, UserPromptPart, SystemPromptPart
from upsonic.tools import tool
class AgentState(TypedDict):
messages: Annotated[List, operator.add]
result: str
@tool
def calculator(a: float, b: float, operation: str) -> float:
"""Perform basic math operations."""
if operation == "add":
return a + b
elif operation == "multiply":
return a * b
return 0
def agent_node(state: AgentState) -> dict:
"""Agent node with tool access."""
model = infer_model("openai/gpt-4o-mini")
model_with_tools = model.bind_tools([calculator])
user_message = state["messages"][-1] if state["messages"] else "Hello"
request = ModelRequest(parts=[
SystemPromptPart(content="You are a helpful assistant."),
UserPromptPart(content=str(user_message))
])
response = model_with_tools.invoke([request])
return {"messages": [response], "result": str(response)}
# Build graph
builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)
graph = builder.compile()
# Execute
result = graph.invoke({
"messages": ["What is 23 multiplied by 17?"],
"result": ""
})
print(result["result"])
Navigation