Skip to main content

Overview

StateGraph is a powerful graph-based workflow engine that enables you to build complex, stateful AI applications with explicit control flow. Instead of writing monolithic functions or brittle chains, you define workflows as graphs of nodes that can branch, loop, persist state, and recover from failures. Without StateGraph, building reliable AI workflows means dealing with:
  • Tangled control flow - Complex if/else chains that become unmaintainable
  • Lost state - No way to pause, resume, or recover from failures
  • No visibility - Black box execution with no insight into what’s happening
  • Brittle retry logic - Manual error handling scattered throughout your code
With StateGraph, you get: Explicit Control Flow - Define workflows as visual graphs with nodes and edges
Persistent State - Automatic checkpointing and state management across executions
Time Travel - Access any historical state and fork execution timelines
Human-in-the-Loop - Pause execution for human review and approval
Built-in Reliability - Automatic retries, caching, and durability modes
Dynamic Parallelization - Send API for orchestrator-worker patterns
Recovery - Resume from failures without starting over

When to Use StateGraph

StateGraph is perfect for:
  • 🤖 AI Agents - Multi-step reasoning with tool calls and decision points
  • 📋 Approval Workflows - Content review, moderation, human-in-the-loop processes
  • 🔄 Stateful Applications - Chat systems, multi-turn conversations, games
  • 🌊 Data Pipelines - Complex ETL with branching logic and error recovery
  • 🧪 Research & Experimentation - Iterative processes with state inspection
  • 📊 Business Processes - Customer support routing, order processing, onboarding

Core Concepts

States

States are typed dictionaries that flow through your graph. Use TypedDict to define your state schema with full type safety:
from typing_extensions import TypedDict
from typing import Annotated, List
import operator

class ConversationState(TypedDict):
    # Reducer field - new messages append to the list
    messages: Annotated[List[str], operator.add]
    
    # Simple fields - new values replace old
    current_step: str
    user_intent: str
    confidence: float

Nodes

Nodes are functions that process state and return updates. Each node is a unit of work in your workflow:
def analyze_intent(state: ConversationState) -> dict:
    """Classify user intent using an LLM."""
    intent = model.invoke(state["messages"][-1])
    return {
        "user_intent": intent,
        "current_step": "analyzed"
    }

Edges

Edges connect nodes and define the flow of execution. You can have:
  • Simple edges - Direct connections from one node to another
  • Conditional edges - Branch based on state
  • Dynamic edges - Send API for parallel worker patterns

The Graph

Combine everything into a compiled graph that orchestrates execution:
from upsonic.graphv2 import StateGraph, START, END

builder = StateGraph(ConversationState)
builder.add_node("analyze", analyze_intent)
builder.add_node("respond", generate_response)

builder.add_edge(START, "analyze")
builder.add_edge("analyze", "respond")
builder.add_edge("respond", END)

graph = builder.compile()

# Execute the graph
result = graph.invoke({
    "messages": ["Hello, I need help"],
    "current_step": "started"
})

Key Features

Persistence & Time Travel

Every step is automatically checkpointed, enabling you to:
  • Resume execution after failures
  • Inspect state at any point in history
  • Fork execution to explore alternative paths

Human-in-the-Loop

Use interrupt() to pause execution and wait for human input:
from upsonic.graphv2 import interrupt

def review_content(state):
    approved = interrupt({
        "action": "review",
        "content": state["draft"]
    })
    return {"approved": approved}

Reliability Features

  • Retry policies - Automatic retry with exponential backoff
  • Cache policies - Avoid re-executing expensive operations
  • Durability modes - Control when state is persisted (sync/async/exit)

Dynamic Parallelization

Use the Send API for orchestrator-worker patterns:
from upsonic.graphv2 import Send

def fan_out(state):
    return [
        Send("worker", {"item": item})
        for item in state["items"]
    ]

Architecture

StateGraph follows a simple but powerful architecture:
Input State → [Node 1] → [Node 2] → [Conditional] → [Node 3] → Output State
                ↓           ↓            ↓              ↓
           Checkpoint   Checkpoint   Checkpoint    Checkpoint
Each node execution:
  1. Receives the current state
  2. Performs its work (with optional retry/cache)
  3. Returns state updates
  4. Updates are merged using reducers
  5. State is checkpointed (based on durability mode)
  6. Next node(s) are determined and executed

Quick Example

Here’s a complete example of a customer support routing system:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, Command
from upsonic.models import infer_model

class SupportState(TypedDict):
    customer_message: str
    classification: str
    urgency: str
    response: str

def classify(state: SupportState) -> Command:
    """Classify the support request."""
    model = infer_model("openai/gpt-4o-mini")
    classification = model.invoke(
        f"Classify this support message: {state['customer_message']}"
    )
    
    # Route based on classification
    if "urgent" in classification.lower():
        return Command(
            update={"classification": classification, "urgency": "high"},
            goto="handle_urgent"
        )
    else:
        return Command(
            update={"classification": classification, "urgency": "normal"},
            goto="handle_normal"
        )

def handle_urgent(state: SupportState) -> dict:
    return {"response": "⚠️ Escalated to priority support team"}

def handle_normal(state: SupportState) -> dict:
    return {"response": "📧 Assigned to support queue"}

# Build the graph
builder = StateGraph(SupportState)
builder.add_node("classify", classify)
builder.add_node("handle_urgent", handle_urgent)
builder.add_node("handle_normal", handle_normal)

builder.add_edge(START, "classify")
builder.add_edge("handle_urgent", END)
builder.add_edge("handle_normal", END)

graph = builder.compile()

# Execute
result = graph.invoke({
    "customer_message": "URGENT: System is down!",
    "classification": "",
    "urgency": "",
    "response": ""
})

print(result["response"])  # ⚠️ Escalated to priority support team

Next Steps