Skip to main content

Overview

StateGraph automatically saves execution state at every step, enabling powerful features:
  • 🔄 Resume from failures - Restart where you left off
  • 🕐 Time travel - Access any historical state
  • 🌳 Fork execution - Create alternative timelines
  • 💾 Multi-session - Continue conversations across sessions
  • 🧵 Isolated threads - Independent execution histories

Checkpointers

Checkpointers store graph state. Choose based on your needs:
CheckpointerStorageUse CasePersistence
MemorySaverIn-memoryDevelopment, testingLost on restart
SqliteCheckpointerSQLite fileProduction, local appsSurvives restarts

Using MemorySaver

Perfect for development and testing:
from upsonic.graphv2 import StateGraph, MemorySaver

builder = StateGraph(MyState)
# ... add nodes and edges ...

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

Using SqliteCheckpointer

For persistent storage across restarts:
import sqlite3
from upsonic.graphv2 import StateGraph, SqliteCheckpointer

# Create or open database
conn = sqlite3.connect("graph_checkpoints.db")

checkpointer = SqliteCheckpointer(conn)
graph = builder.compile(checkpointer=checkpointer)
SqliteCheckpointer automatically creates necessary tables on first use.

Threads

Threads organize independent execution histories. Each thread has its own state and checkpoint history.

Basic Threading

from upsonic.graphv2 import StateGraph, MemorySaver

graph = builder.compile(checkpointer=MemorySaver())

# Thread 1 - User conversation
config1 = {"configurable": {"thread_id": "user-123"}}
result1 = graph.invoke(initial_state, config=config1)

# Thread 2 - Different user
config2 = {"configurable": {"thread_id": "user-456"}}
result2 = graph.invoke(initial_state, config=config2)

# Threads are completely isolated
Use thread IDs that make sense for your domain: user IDs, session IDs, conversation IDs, etc.

Multi-Turn Conversations

Each invocation with the same thread continues from the last checkpoint:
from typing import Annotated, List
from typing_extensions import TypedDict
import operator

class ChatState(TypedDict):
    messages: Annotated[List[str], operator.add]
    turn_count: Annotated[int, lambda a, b: a + b]

def chat_node(state: ChatState) -> dict:
    # Get conversation history
    history = state["messages"]
    
    # Generate response using LLM
    response = model.invoke(history)
    
    return {
        "messages": [f"Assistant: {response}"],
        "turn_count": 1
    }

# Build graph
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Conversation thread
config = {"configurable": {"thread_id": "conversation-1"}}

# Turn 1
result1 = graph.invoke(
    {"messages": ["User: Hello"], "turn_count": 0},
    config=config
)
# messages = ["User: Hello", "Assistant: Hi! How can I help?"]
# turn_count = 1

# Turn 2 - continues from previous state
result2 = graph.invoke(
    {"messages": ["User: What's the weather?"]},
    config=config
)
# messages = ["User: Hello", "Assistant: Hi! How can I help?", "User: What's the weather?", "Assistant: I don't have weather data"]
# turn_count = 2

# Turn 3
result3 = graph.invoke(
    {"messages": ["User: Thank you"]},
    config=config
)
# turn_count = 3
When using checkpointers, you only need to provide new state changes. The graph automatically loads and merges with the previous state.

Getting Current State

Inspect the current state of a thread:
# Get current state
state = graph.get_state(config)

if state:
    print(f"Thread ID: {state.config['configurable']['thread_id']}")
    print(f"Checkpoint ID: {state.config['configurable']['checkpoint_id']}")
    print(f"Current values: {state.values}")
    print(f"Next nodes: {state.next}")
StateSnapshot fields:
  • values - Current state dictionary
  • next - List of nodes to execute next
  • config - Thread and checkpoint IDs
  • metadata - Additional checkpoint metadata

Time Travel

Access any point in execution history:

Viewing History

# Get last 10 checkpoints
history = graph.get_state_history(config, limit=10)

for i, checkpoint in enumerate(history):
    print(f"Checkpoint {i + 1}:")
    print(f"  ID: {checkpoint.config['configurable']['checkpoint_id']}")
    print(f"  State: {checkpoint.values}")
    print(f"  Timestamp: {checkpoint.metadata.get('timestamp')}")
History is returned in reverse chronological order (newest first).

Complete Example

from upsonic.graphv2 import StateGraph, START, END, MemorySaver
from typing_extensions import TypedDict
from typing import Annotated
import operator

class ChatState(TypedDict):
    messages: Annotated[list, operator.add]
    turn_count: Annotated[int, lambda a, b: a + b]
    topic: str

def chat_node(state: ChatState) -> dict:
    # Simulate chat
    messages = state.get("messages", [])
    last_msg = messages[-1] if messages else ""
    
    response = f"Response to: {last_msg}"
    topic = last_msg.split()[0] if last_msg else "general"
    
    return {
        "messages": [response],
        "turn_count": 1,
        "topic": topic
    }

# Build graph
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "chat-123"}}

# Turn 1
graph.invoke({"messages": ["Tell me about Python"], "turn_count": 0, "topic": ""}, config)

# Turn 2
graph.invoke({"messages": ["What about JavaScript?"]}, config)

# Turn 3
graph.invoke({"messages": ["How do they compare?"]}, config)

# View history
history = graph.get_state_history(config, limit=5)

print(f"Found {len(history)} checkpoints:")
for i, checkpoint in enumerate(history):
    print(f"\nCheckpoint {i + 1}:")
    print(f"  Turn count: {checkpoint.values['turn_count']}")
    print(f"  Topic: {checkpoint.values.get('topic')}")
    print(f"  Messages: {len(checkpoint.values.get('messages', []))}")

Forking Execution

Create alternative timelines by resuming from historical checkpoints:
from upsonic.graphv2 import StateGraph, MemorySaver

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "experiment-1"}}

# Execute a few steps
graph.invoke({"step": 1}, config)
graph.invoke({"step": 2}, config)
graph.invoke({"step": 3}, config)

# Get history
history = graph.get_state_history(config)

# Fork from step 1 (last checkpoint in history)
fork_checkpoint = history[-1]  # Oldest checkpoint

fork_config = {
    "configurable": {
        "thread_id": "experiment-1",
        "checkpoint_id": fork_checkpoint.config['configurable']['checkpoint_id']
    }
}

# Continue from that point with different input
result = graph.invoke({"alternative_input": "data"}, config=fork_config)

# This creates a new branch in the execution tree
Use Cases for Forking:
  • A/B testing different approaches
  • Exploring alternative conversation paths
  • Debugging specific decision points
  • Trying different parameters

Manual State Updates

Update state manually without executing nodes:
# Update state as if a specific node modified it
graph.update_state(
    config=config,
    values={"status": "manually_updated", "flag": True},
    as_node="some_node"  # Optional: attribute update to a specific node
)

# Check updated state
updated = graph.get_state(config)
print(updated.values)
Manual updates create a new checkpoint but don’t execute any nodes. Use carefully as they can break workflow logic.

Durability Modes

Control when checkpoints are saved:
ModeBehaviorUse Case
syncSave before continuingMaximum safety, slower
asyncSave in backgroundBalance of safety and speed
exitSave only on completionMaximum speed, less safe
# Sync - guaranteed persistence at each step
graph = builder.compile(
    checkpointer=checkpointer,
    durability="sync"
)

# Async - background persistence (default)
graph = builder.compile(
    checkpointer=checkpointer,
    durability="async"
)

# Exit - only persist at the end
graph = builder.compile(
    checkpointer=checkpointer,
    durability="exit"
)
Default is “async” - provides good balance between safety and performance.

Durability Trade-offs

# Sync - Maximum safety
# ✅ State guaranteed persisted before next step
# ❌ Slower due to synchronous writes
graph = builder.compile(checkpointer=checkpointer, durability="sync")

# Async - Balanced (default)
# ✅ Good performance
# ✅ State persisted (eventually)
# ⚠️  Small window where state might be lost if process crashes
graph = builder.compile(checkpointer=checkpointer, durability="async")

# Exit - Maximum speed
# ✅ Fastest execution
# ❌ No persistence until completion
# ❌ Failures lose all progress
graph = builder.compile(checkpointer=checkpointer, durability="exit")

Recovery from Failures

Automatically resume from the last checkpoint:
class State(TypedDict):
    step: int
    data: str

def safe_step(state: State) -> dict:
    return {"step": 1, "data": "safe"}

def failing_step(state: State) -> dict:
    if state["step"] < 2:
        raise ValueError("Not ready")
    return {"step": 2, "data": "recovered"}

builder = StateGraph(State)
builder.add_node("safe", safe_step)
builder.add_node("failing", failing_step)
builder.add_edge(START, "safe")
builder.add_edge("safe", "failing")
builder.add_edge("failing", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "recovery-test"}}

# First attempt - will fail
try:
    graph.invoke({"step": 0, "data": ""}, config)
except ValueError:
    print("First attempt failed")

# Check what was saved
checkpoint = checkpointer.get("recovery-test")
print(f"Saved state: {checkpoint.state}")  # step=1, data="safe"

# Resume with updated input
result = graph.invoke({"step": 2}, config)
print(f"Recovered: {result}")  # step=2, data="recovered"
Resume with None: You can resume without providing new input: graph.invoke(None, config=config)

Best Practices

1. Choose Meaningful Thread IDs

# ✅ Good - semantic IDs
config = {"configurable": {"thread_id": f"user-{user_id}-session-{session_id}"}}

# ❌ Bad - random IDs you can't track
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

2. Clean Up Old Threads

Periodically delete completed threads to save space:
# For MemorySaver
if thread_id in checkpointer._storage:
    del checkpointer._storage[thread_id]

# For SqliteCheckpointer
cursor = conn.cursor()
cursor.execute("DELETE FROM checkpoints WHERE thread_id = ?", (thread_id,))
conn.commit()

3. Use Appropriate Durability

# For critical workflows - use sync
critical_graph = builder.compile(
    checkpointer=checkpointer,
    durability="sync"
)

# For high-throughput - use async (default)
fast_graph = builder.compile(
    checkpointer=checkpointer,
    durability="async"
)

# For stateless tasks - no checkpointer needed
stateless_graph = builder.compile()

4. Monitor Checkpoint Size

# Check state size
state = graph.get_state(config)
import sys
size_bytes = sys.getsizeof(str(state.values))
print(f"State size: {size_bytes} bytes")

# Keep state lean - don't store large objects

Complete Example: Multi-Session Chat

from typing import Annotated, List
from typing_extensions import TypedDict
import operator

from upsonic.graphv2 import StateGraph, START, END, MemorySaver
from upsonic.models import infer_model
from upsonic.messages import ModelRequest, UserPromptPart, SystemPromptPart

class ChatState(TypedDict):
    messages: Annotated[List[str], operator.add]
    turn_count: Annotated[int, lambda a, b: a + b]
    last_topic: str

def chat_node(state: ChatState) -> dict:
    messages = state.get("messages", [])
    
    # Use LLM
    model = infer_model("openai/gpt-4o-mini")
    request = ModelRequest(parts=[
        SystemPromptPart(content="You are a helpful assistant."),
        UserPromptPart(content="\n".join(messages[-3:]))  # Last 3 messages
    ])
    
    response = model.invoke([request])
    topic = messages[-1].split()[0] if messages else "general"
    
    return {
        "messages": [f"Assistant: {response}"],
        "turn_count": 1,
        "last_topic": topic
    }

# Build persistent chat
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Session 1
session_config = {"configurable": {"thread_id": "user-alice-chat"}}

graph.invoke(
    {"messages": ["User: Hello"], "turn_count": 0, "last_topic": ""},
    config=session_config
)

graph.invoke(
    {"messages": ["User: Tell me about Python"]},
    config=session_config
)

# ... user closes app ...

# Session 2 (later) - continues from where left off
graph.invoke(
    {"messages": ["User: What about decorators?"]},
    config=session_config
)

# View history
history = graph.get_state_history(session_config)
print(f"Total turns: {len(history)}")

Next Steps