Overview
StateGraph automatically saves execution state at every step, enabling powerful features:
- 🔄 Resume from failures - Restart where you left off
- 🕐 Time travel - Access any historical state
- 🌳 Fork execution - Create alternative timelines
- 💾 Multi-session - Continue conversations across sessions
- 🧵 Isolated threads - Independent execution histories
Checkpointers
Checkpointers store graph state. Choose based on your needs:
| Checkpointer | Storage | Use Case | Persistence |
|---|
MemorySaver | In-memory | Development, testing | Lost on restart |
SqliteCheckpointer | SQLite file | Production, local apps | Survives restarts |
Using MemorySaver
Perfect for development and testing:
from upsonic.graphv2 import StateGraph, MemorySaver
builder = StateGraph(MyState)
# ... add nodes and edges ...
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
Using SqliteCheckpointer
For persistent storage across restarts:
import sqlite3
from upsonic.graphv2 import StateGraph, SqliteCheckpointer
# Create or open database
conn = sqlite3.connect("graph_checkpoints.db")
checkpointer = SqliteCheckpointer(conn)
graph = builder.compile(checkpointer=checkpointer)
SqliteCheckpointer automatically creates necessary tables on first use.
Threads
Threads organize independent execution histories. Each thread has its own state and checkpoint history.
Basic Threading
from upsonic.graphv2 import StateGraph, MemorySaver
graph = builder.compile(checkpointer=MemorySaver())
# Thread 1 - User conversation
config1 = {"configurable": {"thread_id": "user-123"}}
result1 = graph.invoke(initial_state, config=config1)
# Thread 2 - Different user
config2 = {"configurable": {"thread_id": "user-456"}}
result2 = graph.invoke(initial_state, config=config2)
# Threads are completely isolated
Use thread IDs that make sense for your domain: user IDs, session IDs, conversation IDs, etc.
Multi-Turn Conversations
Each invocation with the same thread continues from the last checkpoint:
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
class ChatState(TypedDict):
messages: Annotated[List[str], operator.add]
turn_count: Annotated[int, lambda a, b: a + b]
def chat_node(state: ChatState) -> dict:
# Get conversation history
history = state["messages"]
# Generate response using LLM
response = model.invoke(history)
return {
"messages": [f"Assistant: {response}"],
"turn_count": 1
}
# Build graph
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Conversation thread
config = {"configurable": {"thread_id": "conversation-1"}}
# Turn 1
result1 = graph.invoke(
{"messages": ["User: Hello"], "turn_count": 0},
config=config
)
# messages = ["User: Hello", "Assistant: Hi! How can I help?"]
# turn_count = 1
# Turn 2 - continues from previous state
result2 = graph.invoke(
{"messages": ["User: What's the weather?"]},
config=config
)
# messages = ["User: Hello", "Assistant: Hi! How can I help?", "User: What's the weather?", "Assistant: I don't have weather data"]
# turn_count = 2
# Turn 3
result3 = graph.invoke(
{"messages": ["User: Thank you"]},
config=config
)
# turn_count = 3
When using checkpointers, you only need to provide new state changes. The graph automatically loads and merges with the previous state.
Getting Current State
Inspect the current state of a thread:
# Get current state
state = graph.get_state(config)
if state:
print(f"Thread ID: {state.config['configurable']['thread_id']}")
print(f"Checkpoint ID: {state.config['configurable']['checkpoint_id']}")
print(f"Current values: {state.values}")
print(f"Next nodes: {state.next}")
StateSnapshot fields:
values - Current state dictionary
next - List of nodes to execute next
config - Thread and checkpoint IDs
metadata - Additional checkpoint metadata
Time Travel
Access any point in execution history:
Viewing History
# Get last 10 checkpoints
history = graph.get_state_history(config, limit=10)
for i, checkpoint in enumerate(history):
print(f"Checkpoint {i + 1}:")
print(f" ID: {checkpoint.config['configurable']['checkpoint_id']}")
print(f" State: {checkpoint.values}")
print(f" Timestamp: {checkpoint.metadata.get('timestamp')}")
History is returned in reverse chronological order (newest first).
Complete Example
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
from typing_extensions import TypedDict
from typing import Annotated
import operator
class ChatState(TypedDict):
messages: Annotated[list, operator.add]
turn_count: Annotated[int, lambda a, b: a + b]
topic: str
def chat_node(state: ChatState) -> dict:
# Simulate chat
messages = state.get("messages", [])
last_msg = messages[-1] if messages else ""
response = f"Response to: {last_msg}"
topic = last_msg.split()[0] if last_msg else "general"
return {
"messages": [response],
"turn_count": 1,
"topic": topic
}
# Build graph
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "chat-123"}}
# Turn 1
graph.invoke({"messages": ["Tell me about Python"], "turn_count": 0, "topic": ""}, config)
# Turn 2
graph.invoke({"messages": ["What about JavaScript?"]}, config)
# Turn 3
graph.invoke({"messages": ["How do they compare?"]}, config)
# View history
history = graph.get_state_history(config, limit=5)
print(f"Found {len(history)} checkpoints:")
for i, checkpoint in enumerate(history):
print(f"\nCheckpoint {i + 1}:")
print(f" Turn count: {checkpoint.values['turn_count']}")
print(f" Topic: {checkpoint.values.get('topic')}")
print(f" Messages: {len(checkpoint.values.get('messages', []))}")
Forking Execution
Create alternative timelines by resuming from historical checkpoints:
from upsonic.graphv2 import StateGraph, MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "experiment-1"}}
# Execute a few steps
graph.invoke({"step": 1}, config)
graph.invoke({"step": 2}, config)
graph.invoke({"step": 3}, config)
# Get history
history = graph.get_state_history(config)
# Fork from step 1 (last checkpoint in history)
fork_checkpoint = history[-1] # Oldest checkpoint
fork_config = {
"configurable": {
"thread_id": "experiment-1",
"checkpoint_id": fork_checkpoint.config['configurable']['checkpoint_id']
}
}
# Continue from that point with different input
result = graph.invoke({"alternative_input": "data"}, config=fork_config)
# This creates a new branch in the execution tree
Use Cases for Forking:
- A/B testing different approaches
- Exploring alternative conversation paths
- Debugging specific decision points
- Trying different parameters
Manual State Updates
Update state manually without executing nodes:
# Update state as if a specific node modified it
graph.update_state(
config=config,
values={"status": "manually_updated", "flag": True},
as_node="some_node" # Optional: attribute update to a specific node
)
# Check updated state
updated = graph.get_state(config)
print(updated.values)
Manual updates create a new checkpoint but don’t execute any nodes. Use carefully as they can break workflow logic.
Durability Modes
Control when checkpoints are saved:
| Mode | Behavior | Use Case |
|---|
sync | Save before continuing | Maximum safety, slower |
async | Save in background | Balance of safety and speed |
exit | Save only on completion | Maximum speed, less safe |
# Sync - guaranteed persistence at each step
graph = builder.compile(
checkpointer=checkpointer,
durability="sync"
)
# Async - background persistence (default)
graph = builder.compile(
checkpointer=checkpointer,
durability="async"
)
# Exit - only persist at the end
graph = builder.compile(
checkpointer=checkpointer,
durability="exit"
)
Default is “async” - provides good balance between safety and performance.
Durability Trade-offs
# Sync - Maximum safety
# ✅ State guaranteed persisted before next step
# ❌ Slower due to synchronous writes
graph = builder.compile(checkpointer=checkpointer, durability="sync")
# Async - Balanced (default)
# ✅ Good performance
# ✅ State persisted (eventually)
# ⚠️ Small window where state might be lost if process crashes
graph = builder.compile(checkpointer=checkpointer, durability="async")
# Exit - Maximum speed
# ✅ Fastest execution
# ❌ No persistence until completion
# ❌ Failures lose all progress
graph = builder.compile(checkpointer=checkpointer, durability="exit")
Recovery from Failures
Automatically resume from the last checkpoint:
class State(TypedDict):
step: int
data: str
def safe_step(state: State) -> dict:
return {"step": 1, "data": "safe"}
def failing_step(state: State) -> dict:
if state["step"] < 2:
raise ValueError("Not ready")
return {"step": 2, "data": "recovered"}
builder = StateGraph(State)
builder.add_node("safe", safe_step)
builder.add_node("failing", failing_step)
builder.add_edge(START, "safe")
builder.add_edge("safe", "failing")
builder.add_edge("failing", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "recovery-test"}}
# First attempt - will fail
try:
graph.invoke({"step": 0, "data": ""}, config)
except ValueError:
print("First attempt failed")
# Check what was saved
checkpoint = checkpointer.get("recovery-test")
print(f"Saved state: {checkpoint.state}") # step=1, data="safe"
# Resume with updated input
result = graph.invoke({"step": 2}, config)
print(f"Recovered: {result}") # step=2, data="recovered"
Resume with None: You can resume without providing new input: graph.invoke(None, config=config)
Best Practices
1. Choose Meaningful Thread IDs
# ✅ Good - semantic IDs
config = {"configurable": {"thread_id": f"user-{user_id}-session-{session_id}"}}
# ❌ Bad - random IDs you can't track
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
2. Clean Up Old Threads
Periodically delete completed threads to save space:
# For MemorySaver
if thread_id in checkpointer._storage:
del checkpointer._storage[thread_id]
# For SqliteCheckpointer
cursor = conn.cursor()
cursor.execute("DELETE FROM checkpoints WHERE thread_id = ?", (thread_id,))
conn.commit()
3. Use Appropriate Durability
# For critical workflows - use sync
critical_graph = builder.compile(
checkpointer=checkpointer,
durability="sync"
)
# For high-throughput - use async (default)
fast_graph = builder.compile(
checkpointer=checkpointer,
durability="async"
)
# For stateless tasks - no checkpointer needed
stateless_graph = builder.compile()
4. Monitor Checkpoint Size
# Check state size
state = graph.get_state(config)
import sys
size_bytes = sys.getsizeof(str(state.values))
print(f"State size: {size_bytes} bytes")
# Keep state lean - don't store large objects
Complete Example: Multi-Session Chat
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
from upsonic.models import infer_model
from upsonic.messages import ModelRequest, UserPromptPart, SystemPromptPart
class ChatState(TypedDict):
messages: Annotated[List[str], operator.add]
turn_count: Annotated[int, lambda a, b: a + b]
last_topic: str
def chat_node(state: ChatState) -> dict:
messages = state.get("messages", [])
# Use LLM
model = infer_model("openai/gpt-4o-mini")
request = ModelRequest(parts=[
SystemPromptPart(content="You are a helpful assistant."),
UserPromptPart(content="\n".join(messages[-3:])) # Last 3 messages
])
response = model.invoke([request])
topic = messages[-1].split()[0] if messages else "general"
return {
"messages": [f"Assistant: {response}"],
"turn_count": 1,
"last_topic": topic
}
# Build persistent chat
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Session 1
session_config = {"configurable": {"thread_id": "user-alice-chat"}}
graph.invoke(
{"messages": ["User: Hello"], "turn_count": 0, "last_topic": ""},
config=session_config
)
graph.invoke(
{"messages": ["User: Tell me about Python"]},
config=session_config
)
# ... user closes app ...
# Session 2 (later) - continues from where left off
graph.invoke(
{"messages": ["User: What about decorators?"]},
config=session_config
)
# View history
history = graph.get_state_history(session_config)
print(f"Total turns: {len(history)}")
Next Steps