Overview
StateGraph automatically saves execution state at every step, enabling powerful features:
- 🔄 Resume from failures - Restart where you left off
- 🕐 Time travel - Access any historical state
- 🌳 Fork execution - Create alternative timelines
- 💾 Multi-session - Continue conversations across sessions
Checkpointers
Checkpointers store graph state. Choose based on your needs:
| Checkpointer | Storage | Use Case | Persistence |
|---|
MemorySaver | In-memory | Development, testing | Lost on restart |
SqliteCheckpointer | SQLite file | Production, local apps | Survives restarts |
Using MemorySaver
Best for development and testing:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class SimpleState(TypedDict):
count: int
message: str
def increment_node(state: SimpleState) -> dict:
"""Increment the counter."""
return {"count": state["count"] + 1}
def message_node(state: SimpleState) -> dict:
"""Create a message based on count."""
return {"message": f"Count is now {state['count']}"}
# Build the graph
builder = StateGraph(SimpleState)
builder.add_node("increment", increment_node)
builder.add_node("message", message_node)
builder.add_edge(START, "increment")
builder.add_edge("increment", "message")
builder.add_edge("message", END)
# Compile with MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Execute with thread_id for persistence
config = {"configurable": {"thread_id": "my-thread-1"}}
result = graph.invoke({"count": 0, "message": ""}, config=config)
print(result) # {'count': 1, 'message': 'Count is now 1'}
Using SqliteCheckpointer
Best for production with persistence across restarts:
import sqlite3
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, SqliteCheckpointer
class PersistentState(TypedDict):
user_id: str
data: str
processed: bool
def process_node(state: PersistentState) -> dict:
"""Process user data."""
return {"data": f"Processed: {state['data']}", "processed": True}
# Build the graph
builder = StateGraph(PersistentState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
# Create SQLite connection with check_same_thread=False for async compatibility
# This is required because StateGraph uses asyncio internally
conn = sqlite3.connect("graph_checkpoints.db", check_same_thread=False)
checkpointer = SqliteCheckpointer(conn)
# Compile with SqliteCheckpointer
graph = builder.compile(checkpointer=checkpointer)
# Execute - state is persisted to SQLite
config = {"configurable": {"thread_id": "user-session-123"}}
result = graph.invoke(
{"user_id": "user-123", "data": "raw input", "processed": False},
config=config
)
print(result) # {'user_id': 'user-123', 'data': 'Processed: raw input', 'processed': True}
# State survives program restarts!
# On next run, you can resume from the last checkpoint
Important: Always use check_same_thread=False when creating SQLite connections for StateGraph. This is required because the graph uses asyncio internally which may execute checkpoint operations on different threads.
Threads
Threads organize independent execution histories. Each thread has its own state and checkpoint history.
Multi-Turn Conversations
Build chatbots that maintain conversation history:
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class ChatState(TypedDict):
messages: Annotated[List[str], operator.add]
turn_count: Annotated[int, lambda a, b: a + b]
def chat_node(state: ChatState) -> dict:
"""Respond to the latest message."""
history = state["messages"]
last_message = history[-1] if history else "Hello"
response = f"Bot: You said '{last_message}'. How can I help?"
return {
"messages": [response],
"turn_count": 1
}
# Build the graph
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
# Compile with checkpointer
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Conversation thread
config = {"configurable": {"thread_id": "conversation-1"}}
# Turn 1
result1 = graph.invoke(
{"messages": ["User: Hello!"], "turn_count": 0},
config=config
)
print(f"Turn 1 - Messages: {len(result1['messages'])}") # 2 messages
# Turn 2 - continues from previous state
result2 = graph.invoke(
{"messages": ["User: What's the weather?"]},
config=config
)
print(f"Turn 2 - Messages: {len(result2['messages'])}") # 4 messages
print(f"Total turns: {result2['turn_count']}") # 2
When using checkpointers, you only need to provide new state changes. The graph automatically loads and merges with the previous state.
Multiple Threads
Handle multiple independent conversations:
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class SessionState(TypedDict):
user: str
messages: Annotated[List[str], operator.add]
def respond_node(state: SessionState) -> dict:
"""Generate a response."""
return {"messages": [f"Hello {state['user']}!"]}
# Build the graph
builder = StateGraph(SessionState)
builder.add_node("respond", respond_node)
builder.add_edge(START, "respond")
builder.add_edge("respond", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# User Alice's thread
alice_config = {"configurable": {"thread_id": "alice-session"}}
alice_result = graph.invoke(
{"user": "Alice", "messages": ["Hi!"]},
config=alice_config
)
print(f"Alice: {alice_result['messages']}")
# User Bob's thread (completely independent)
bob_config = {"configurable": {"thread_id": "bob-session"}}
bob_result = graph.invoke(
{"user": "Bob", "messages": ["Hey there!"]},
config=bob_config
)
print(f"Bob: {bob_result['messages']}")
# Each thread has its own state - they don't interfere
Time Travel
Access any point in execution history:
from typing import Annotated
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class StepState(TypedDict):
step: Annotated[int, lambda a, b: a + b]
history: Annotated[list, operator.add]
def step_node(state: StepState) -> dict:
"""Record each step."""
current_step = state["step"] + 1
return {"step": 1, "history": [f"Step {current_step}"]}
# Build a multi-step graph
builder = StateGraph(StepState)
builder.add_node("step1", step_node)
builder.add_node("step2", step_node)
builder.add_node("step3", step_node)
builder.add_edge(START, "step1")
builder.add_edge("step1", "step2")
builder.add_edge("step2", "step3")
builder.add_edge("step3", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Execute
config = {"configurable": {"thread_id": "time-travel-demo"}}
result = graph.invoke({"step": 0, "history": []}, config=config)
print(f"Final: step={result['step']}, history={result['history']}")
# Get state history
history = graph.get_state_history(config, limit=10)
print(f"\nFound {len(list(history))} checkpoints:")
for i, checkpoint in enumerate(graph.get_state_history(config, limit=10)):
print(f" Checkpoint {i + 1}:")
print(f" State: {checkpoint.values}")
print(f" Next nodes: {checkpoint.next}")
print(f" Checkpoint ID: {checkpoint.config['configurable']['checkpoint_id']}")
Forking Execution
Create alternative timelines by resuming from historical checkpoints:
from typing import Annotated
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class ExperimentState(TypedDict):
step: int
path: Annotated[list, operator.add]
def step_node(state: ExperimentState) -> dict:
"""Track the execution path."""
return {"step": state["step"] + 1, "path": [f"step-{state['step'] + 1}"]}
# Build the graph
builder = StateGraph(ExperimentState)
builder.add_node("process", step_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Execute main timeline
config = {"configurable": {"thread_id": "main"}}
graph.invoke({"step": 0, "path": ["start"]}, config=config)
result1 = graph.invoke({"step": 0, "path": []}, config=config)
result2 = graph.invoke({"step": 0, "path": []}, config=config)
print(f"Main timeline: {result2['path']}")
# Get history to find fork point
history = list(graph.get_state_history(config))
print(f"Found {len(history)} checkpoints")
# Fork from an earlier checkpoint
if len(history) > 1:
fork_checkpoint = history[-1] # Oldest checkpoint
fork_config = {
"configurable": {
"thread_id": "experiment-1",
"checkpoint_id": fork_checkpoint.config['configurable']['checkpoint_id']
}
}
# Continue from that point with different input
forked_result = graph.invoke(
{"step": 0, "path": ["forked"]},
config=fork_config
)
print(f"Forked timeline: {forked_result['path']}")
Getting Current State
Inspect the current state of a thread:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class InspectState(TypedDict):
data: str
processed: bool
def process_node(state: InspectState) -> dict:
"""Process the data."""
return {"data": state["data"].upper(), "processed": True}
builder = StateGraph(InspectState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "inspect-demo"}}
graph.invoke({"data": "hello", "processed": False}, config=config)
# Get current state
current_state = graph.get_state(config)
if current_state:
print(f"Current values: {current_state.values}")
print(f"Next nodes: {current_state.next}")
print(f"Thread ID: {current_state.config['configurable']['thread_id']}")
Updating State Manually
Modify state outside of graph execution:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class EditableState(TypedDict):
counter: int
notes: str
def increment_node(state: EditableState) -> dict:
"""Increment the counter."""
return {"counter": state["counter"] + 1}
builder = StateGraph(EditableState)
builder.add_node("increment", increment_node)
builder.add_edge(START, "increment")
builder.add_edge("increment", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "editable-demo"}}
# Run once - counter goes from 0 to 1
graph.invoke({"counter": 0, "notes": ""}, config=config)
print(f"After run: counter={graph.get_state(config).values['counter']}") # 1
# Manually update state to 100
graph.update_state(
config,
values={"counter": 100, "notes": "Manually set to 100"},
as_node="admin" # Optional: track who made the change
)
print(f"After update: counter={graph.get_state(config).values['counter']}") # 100
# Re-run the graph from START with the updated state
# Pass an empty dict {} to restart from START with current state
result = graph.invoke({}, config=config)
print(f"After re-run: counter={result['counter']}") # 101
Note on resuming execution:
- Pass
{} (empty dict) to restart the graph from START with the current checkpoint state
- Pass
None to resume from where execution left off - but if the graph already reached END, it just returns the current state without re-executing
Durability Modes
Control when checkpoints are saved:
| Mode | Behavior | Use Case |
|---|
sync | Save before continuing | Maximum safety, slower |
async | Save in background | Balance of safety and speed (default) |
exit | Save only on completion | Maximum speed, less safe |
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class DurableState(TypedDict):
value: int
def compute_node(state: DurableState) -> dict:
"""Perform computation."""
return {"value": state["value"] * 2}
builder = StateGraph(DurableState)
builder.add_node("compute", compute_node)
builder.add_edge(START, "compute")
builder.add_edge("compute", END)
checkpointer = MemorySaver()
# Sync - guaranteed persistence at each step
# Best for critical workflows where data loss is unacceptable
graph_sync = builder.compile(
checkpointer=checkpointer,
durability="sync"
)
# Async - background persistence (default)
# Good balance for most applications
graph_async = builder.compile(
checkpointer=checkpointer,
durability="async"
)
# Exit - only persist at the end
# Best for high-throughput, less critical workflows
graph_exit = builder.compile(
checkpointer=checkpointer,
durability="exit"
)
# Use the appropriate mode for your use case
config = {"configurable": {"thread_id": "durable-demo"}}
result = graph_sync.invoke({"value": 5}, config=config)
print(f"Result: {result['value']}") # 10
Next Steps