Skip to main content

Overview

StateGraph automatically saves execution state at every step, enabling powerful features:
  • 🔄 Resume from failures - Restart where you left off
  • 🕐 Time travel - Access any historical state
  • 🌳 Fork execution - Create alternative timelines
  • 💾 Multi-session - Continue conversations across sessions

Checkpointers

Checkpointers store graph state. Choose based on your needs:
CheckpointerStorageUse CasePersistence
MemorySaverIn-memoryDevelopment, testingLost on restart
SqliteCheckpointerSQLite fileProduction, local appsSurvives restarts

Using MemorySaver

Best for development and testing:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class SimpleState(TypedDict):
    count: int
    message: str

def increment_node(state: SimpleState) -> dict:
    """Increment the counter."""
    return {"count": state["count"] + 1}

def message_node(state: SimpleState) -> dict:
    """Create a message based on count."""
    return {"message": f"Count is now {state['count']}"}

# Build the graph
builder = StateGraph(SimpleState)
builder.add_node("increment", increment_node)
builder.add_node("message", message_node)
builder.add_edge(START, "increment")
builder.add_edge("increment", "message")
builder.add_edge("message", END)

# Compile with MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Execute with thread_id for persistence
config = {"configurable": {"thread_id": "my-thread-1"}}
result = graph.invoke({"count": 0, "message": ""}, config=config)
print(result)  # {'count': 1, 'message': 'Count is now 1'}

Using SqliteCheckpointer

Best for production with persistence across restarts:
import sqlite3
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, SqliteCheckpointer

class PersistentState(TypedDict):
    user_id: str
    data: str
    processed: bool

def process_node(state: PersistentState) -> dict:
    """Process user data."""
    return {"data": f"Processed: {state['data']}", "processed": True}

# Build the graph
builder = StateGraph(PersistentState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)

# Create SQLite connection with check_same_thread=False for async compatibility
# This is required because StateGraph uses asyncio internally
conn = sqlite3.connect("graph_checkpoints.db", check_same_thread=False)
checkpointer = SqliteCheckpointer(conn)

# Compile with SqliteCheckpointer
graph = builder.compile(checkpointer=checkpointer)

# Execute - state is persisted to SQLite
config = {"configurable": {"thread_id": "user-session-123"}}
result = graph.invoke(
    {"user_id": "user-123", "data": "raw input", "processed": False},
    config=config
)
print(result)  # {'user_id': 'user-123', 'data': 'Processed: raw input', 'processed': True}

# State survives program restarts!
# On next run, you can resume from the last checkpoint
Important: Always use check_same_thread=False when creating SQLite connections for StateGraph. This is required because the graph uses asyncio internally which may execute checkpoint operations on different threads.

Threads

Threads organize independent execution histories. Each thread has its own state and checkpoint history.

Multi-Turn Conversations

Build chatbots that maintain conversation history:
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class ChatState(TypedDict):
    messages: Annotated[List[str], operator.add]
    turn_count: Annotated[int, lambda a, b: a + b]

def chat_node(state: ChatState) -> dict:
    """Respond to the latest message."""
    history = state["messages"]
    last_message = history[-1] if history else "Hello"
    response = f"Bot: You said '{last_message}'. How can I help?"
    return {
        "messages": [response],
        "turn_count": 1
    }

# Build the graph
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)

# Compile with checkpointer
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Conversation thread
config = {"configurable": {"thread_id": "conversation-1"}}

# Turn 1
result1 = graph.invoke(
    {"messages": ["User: Hello!"], "turn_count": 0},
    config=config
)
print(f"Turn 1 - Messages: {len(result1['messages'])}")  # 2 messages

# Turn 2 - continues from previous state
result2 = graph.invoke(
    {"messages": ["User: What's the weather?"]},
    config=config
)
print(f"Turn 2 - Messages: {len(result2['messages'])}")  # 4 messages
print(f"Total turns: {result2['turn_count']}")  # 2
When using checkpointers, you only need to provide new state changes. The graph automatically loads and merges with the previous state.

Multiple Threads

Handle multiple independent conversations:
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class SessionState(TypedDict):
    user: str
    messages: Annotated[List[str], operator.add]

def respond_node(state: SessionState) -> dict:
    """Generate a response."""
    return {"messages": [f"Hello {state['user']}!"]}

# Build the graph
builder = StateGraph(SessionState)
builder.add_node("respond", respond_node)
builder.add_edge(START, "respond")
builder.add_edge("respond", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# User Alice's thread
alice_config = {"configurable": {"thread_id": "alice-session"}}
alice_result = graph.invoke(
    {"user": "Alice", "messages": ["Hi!"]},
    config=alice_config
)
print(f"Alice: {alice_result['messages']}")

# User Bob's thread (completely independent)
bob_config = {"configurable": {"thread_id": "bob-session"}}
bob_result = graph.invoke(
    {"user": "Bob", "messages": ["Hey there!"]},
    config=bob_config
)
print(f"Bob: {bob_result['messages']}")

# Each thread has its own state - they don't interfere

Time Travel

Access any point in execution history:
from typing import Annotated
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class StepState(TypedDict):
    step: Annotated[int, lambda a, b: a + b]
    history: Annotated[list, operator.add]

def step_node(state: StepState) -> dict:
    """Record each step."""
    current_step = state["step"] + 1
    return {"step": 1, "history": [f"Step {current_step}"]}

# Build a multi-step graph
builder = StateGraph(StepState)
builder.add_node("step1", step_node)
builder.add_node("step2", step_node)
builder.add_node("step3", step_node)
builder.add_edge(START, "step1")
builder.add_edge("step1", "step2")
builder.add_edge("step2", "step3")
builder.add_edge("step3", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Execute
config = {"configurable": {"thread_id": "time-travel-demo"}}
result = graph.invoke({"step": 0, "history": []}, config=config)
print(f"Final: step={result['step']}, history={result['history']}")

# Get state history
history = graph.get_state_history(config, limit=10)

print(f"\nFound {len(list(history))} checkpoints:")
for i, checkpoint in enumerate(graph.get_state_history(config, limit=10)):
    print(f"  Checkpoint {i + 1}:")
    print(f"    State: {checkpoint.values}")
    print(f"    Next nodes: {checkpoint.next}")
    print(f"    Checkpoint ID: {checkpoint.config['configurable']['checkpoint_id']}")

Forking Execution

Create alternative timelines by resuming from historical checkpoints:
from typing import Annotated
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class ExperimentState(TypedDict):
    step: int
    path: Annotated[list, operator.add]

def step_node(state: ExperimentState) -> dict:
    """Track the execution path."""
    return {"step": state["step"] + 1, "path": [f"step-{state['step'] + 1}"]}

# Build the graph
builder = StateGraph(ExperimentState)
builder.add_node("process", step_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Execute main timeline
config = {"configurable": {"thread_id": "main"}}
graph.invoke({"step": 0, "path": ["start"]}, config=config)
result1 = graph.invoke({"step": 0, "path": []}, config=config)
result2 = graph.invoke({"step": 0, "path": []}, config=config)
print(f"Main timeline: {result2['path']}")

# Get history to find fork point
history = list(graph.get_state_history(config))
print(f"Found {len(history)} checkpoints")

# Fork from an earlier checkpoint
if len(history) > 1:
    fork_checkpoint = history[-1]  # Oldest checkpoint
    
    fork_config = {
        "configurable": {
            "thread_id": "experiment-1",
            "checkpoint_id": fork_checkpoint.config['configurable']['checkpoint_id']
        }
    }
    
    # Continue from that point with different input
    forked_result = graph.invoke(
        {"step": 0, "path": ["forked"]},
        config=fork_config
    )
    print(f"Forked timeline: {forked_result['path']}")

Getting Current State

Inspect the current state of a thread:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class InspectState(TypedDict):
    data: str
    processed: bool

def process_node(state: InspectState) -> dict:
    """Process the data."""
    return {"data": state["data"].upper(), "processed": True}

builder = StateGraph(InspectState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "inspect-demo"}}
graph.invoke({"data": "hello", "processed": False}, config=config)

# Get current state
current_state = graph.get_state(config)
if current_state:
    print(f"Current values: {current_state.values}")
    print(f"Next nodes: {current_state.next}")
    print(f"Thread ID: {current_state.config['configurable']['thread_id']}")

Updating State Manually

Modify state outside of graph execution:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class EditableState(TypedDict):
    counter: int
    notes: str

def increment_node(state: EditableState) -> dict:
    """Increment the counter."""
    return {"counter": state["counter"] + 1}

builder = StateGraph(EditableState)
builder.add_node("increment", increment_node)
builder.add_edge(START, "increment")
builder.add_edge("increment", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "editable-demo"}}

# Run once - counter goes from 0 to 1
graph.invoke({"counter": 0, "notes": ""}, config=config)
print(f"After run: counter={graph.get_state(config).values['counter']}")  # 1

# Manually update state to 100
graph.update_state(
    config,
    values={"counter": 100, "notes": "Manually set to 100"},
    as_node="admin"  # Optional: track who made the change
)
print(f"After update: counter={graph.get_state(config).values['counter']}")  # 100

# Re-run the graph from START with the updated state
# Pass an empty dict {} to restart from START with current state
result = graph.invoke({}, config=config)
print(f"After re-run: counter={result['counter']}")  # 101
Note on resuming execution:
  • Pass {} (empty dict) to restart the graph from START with the current checkpoint state
  • Pass None to resume from where execution left off - but if the graph already reached END, it just returns the current state without re-executing

Durability Modes

Control when checkpoints are saved:
ModeBehaviorUse Case
syncSave before continuingMaximum safety, slower
asyncSave in backgroundBalance of safety and speed (default)
exitSave only on completionMaximum speed, less safe
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class DurableState(TypedDict):
    value: int

def compute_node(state: DurableState) -> dict:
    """Perform computation."""
    return {"value": state["value"] * 2}

builder = StateGraph(DurableState)
builder.add_node("compute", compute_node)
builder.add_edge(START, "compute")
builder.add_edge("compute", END)

checkpointer = MemorySaver()

# Sync - guaranteed persistence at each step
# Best for critical workflows where data loss is unacceptable
graph_sync = builder.compile(
    checkpointer=checkpointer,
    durability="sync"
)

# Async - background persistence (default)
# Good balance for most applications
graph_async = builder.compile(
    checkpointer=checkpointer,
    durability="async"
)

# Exit - only persist at the end
# Best for high-throughput, less critical workflows
graph_exit = builder.compile(
    checkpointer=checkpointer,
    durability="exit"
)

# Use the appropriate mode for your use case
config = {"configurable": {"thread_id": "durable-demo"}}
result = graph_sync.invoke({"value": 5}, config=config)
print(f"Result: {result['value']}")  # 10

Next Steps