> ## Documentation Index
> Fetch the complete documentation index at: https://docs.upsonic.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Persistence & Time Travel

> Master checkpointing, state history, and time travel

## Overview

StateGraph automatically saves execution state at every step, enabling powerful features:

* 🔄 **Resume from failures** - Restart where you left off
* 🕐 **Time travel** - Access any historical state
* 🌳 **Fork execution** - Create alternative timelines
* 💾 **Multi-session** - Continue conversations across sessions

## Checkpointers

Checkpointers store graph state. Choose based on your needs:

| Checkpointer         | Storage     | Use Case               | Persistence       |
| -------------------- | ----------- | ---------------------- | ----------------- |
| `MemorySaver`        | In-memory   | Development, testing   | Lost on restart   |
| `SqliteCheckpointer` | SQLite file | Production, local apps | Survives restarts |

### Using MemorySaver

Best for development and testing:

```python theme={null}
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class SimpleState(TypedDict):
    count: int
    message: str

def increment_node(state: SimpleState) -> dict:
    """Increment the counter."""
    return {"count": state["count"] + 1}

def message_node(state: SimpleState) -> dict:
    """Create a message based on count."""
    return {"message": f"Count is now {state['count']}"}

# Build the graph
builder = StateGraph(SimpleState)
builder.add_node("increment", increment_node)
builder.add_node("message", message_node)
builder.add_edge(START, "increment")
builder.add_edge("increment", "message")
builder.add_edge("message", END)

# Compile with MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Execute with thread_id for persistence
config = {"configurable": {"thread_id": "my-thread-1"}}
result = graph.invoke({"count": 0, "message": ""}, config=config)
print(result)  # {'count': 1, 'message': 'Count is now 1'}
```

### Using SqliteCheckpointer

Best for production with persistence across restarts:

```python theme={null}
import sqlite3
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, SqliteCheckpointer

class PersistentState(TypedDict):
    user_id: str
    data: str
    processed: bool

def process_node(state: PersistentState) -> dict:
    """Process user data."""
    return {"data": f"Processed: {state['data']}", "processed": True}

# Build the graph
builder = StateGraph(PersistentState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)

# Create SQLite connection with check_same_thread=False for async compatibility
# This is required because StateGraph uses asyncio internally
conn = sqlite3.connect("graph_checkpoints.db", check_same_thread=False)
checkpointer = SqliteCheckpointer(conn)

# Compile with SqliteCheckpointer
graph = builder.compile(checkpointer=checkpointer)

# Execute - state is persisted to SQLite
config = {"configurable": {"thread_id": "user-session-123"}}
result = graph.invoke(
    {"user_id": "user-123", "data": "raw input", "processed": False},
    config=config
)
print(result)  # {'user_id': 'user-123', 'data': 'Processed: raw input', 'processed': True}

# State survives program restarts!
# On next run, you can resume from the last checkpoint
```

<Warning>
  **Important:** Always use `check_same_thread=False` when creating SQLite connections for StateGraph. This is required because the graph uses asyncio internally which may execute checkpoint operations on different threads.
</Warning>

## Threads

Threads organize independent execution histories. Each thread has its own state and checkpoint history.

### Multi-Turn Conversations

Build chatbots that maintain conversation history:

```python theme={null}
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class ChatState(TypedDict):
    messages: Annotated[List[str], operator.add]
    turn_count: Annotated[int, lambda a, b: a + b]

def chat_node(state: ChatState) -> dict:
    """Respond to the latest message."""
    history = state["messages"]
    last_message = history[-1] if history else "Hello"
    response = f"Bot: You said '{last_message}'. How can I help?"
    return {
        "messages": [response],
        "turn_count": 1
    }

# Build the graph
builder = StateGraph(ChatState)
builder.add_node("chat", chat_node)
builder.add_edge(START, "chat")
builder.add_edge("chat", END)

# Compile with checkpointer
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Conversation thread
config = {"configurable": {"thread_id": "conversation-1"}}

# Turn 1
result1 = graph.invoke(
    {"messages": ["User: Hello!"], "turn_count": 0},
    config=config
)
print(f"Turn 1 - Messages: {len(result1['messages'])}")  # 2 messages

# Turn 2 - continues from previous state
result2 = graph.invoke(
    {"messages": ["User: What's the weather?"]},
    config=config
)
print(f"Turn 2 - Messages: {len(result2['messages'])}")  # 4 messages
print(f"Total turns: {result2['turn_count']}")  # 2
```

<Info>
  When using checkpointers, you only need to provide **new** state changes. The graph automatically loads and merges with the previous state.
</Info>

### Multiple Threads

Handle multiple independent conversations:

```python theme={null}
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class SessionState(TypedDict):
    user: str
    messages: Annotated[List[str], operator.add]

def respond_node(state: SessionState) -> dict:
    """Generate a response."""
    return {"messages": [f"Hello {state['user']}!"]}

# Build the graph
builder = StateGraph(SessionState)
builder.add_node("respond", respond_node)
builder.add_edge(START, "respond")
builder.add_edge("respond", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# User Alice's thread
alice_config = {"configurable": {"thread_id": "alice-session"}}
alice_result = graph.invoke(
    {"user": "Alice", "messages": ["Hi!"]},
    config=alice_config
)
print(f"Alice: {alice_result['messages']}")

# User Bob's thread (completely independent)
bob_config = {"configurable": {"thread_id": "bob-session"}}
bob_result = graph.invoke(
    {"user": "Bob", "messages": ["Hey there!"]},
    config=bob_config
)
print(f"Bob: {bob_result['messages']}")

# Each thread has its own state - they don't interfere
```

## Time Travel

Access any point in execution history:

```python theme={null}
from typing import Annotated
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class StepState(TypedDict):
    step: Annotated[int, lambda a, b: a + b]
    history: Annotated[list, operator.add]

def step_node(state: StepState) -> dict:
    """Record each step."""
    current_step = state["step"] + 1
    return {"step": 1, "history": [f"Step {current_step}"]}

# Build a multi-step graph
builder = StateGraph(StepState)
builder.add_node("step1", step_node)
builder.add_node("step2", step_node)
builder.add_node("step3", step_node)
builder.add_edge(START, "step1")
builder.add_edge("step1", "step2")
builder.add_edge("step2", "step3")
builder.add_edge("step3", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Execute
config = {"configurable": {"thread_id": "time-travel-demo"}}
result = graph.invoke({"step": 0, "history": []}, config=config)
print(f"Final: step={result['step']}, history={result['history']}")

# Get state history
history = graph.get_state_history(config, limit=10)

print(f"\nFound {len(list(history))} checkpoints:")
for i, checkpoint in enumerate(graph.get_state_history(config, limit=10)):
    print(f"  Checkpoint {i + 1}:")
    print(f"    State: {checkpoint.values}")
    print(f"    Next nodes: {checkpoint.next}")
    print(f"    Checkpoint ID: {checkpoint.config['configurable']['checkpoint_id']}")
```

## Forking Execution

Create alternative timelines by resuming from historical checkpoints:

```python theme={null}
from typing import Annotated
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class ExperimentState(TypedDict):
    step: int
    path: Annotated[list, operator.add]

def step_node(state: ExperimentState) -> dict:
    """Track the execution path."""
    return {"step": state["step"] + 1, "path": [f"step-{state['step'] + 1}"]}

# Build the graph
builder = StateGraph(ExperimentState)
builder.add_node("process", step_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Execute main timeline
config = {"configurable": {"thread_id": "main"}}
graph.invoke({"step": 0, "path": ["start"]}, config=config)
result1 = graph.invoke({"step": 0, "path": []}, config=config)
result2 = graph.invoke({"step": 0, "path": []}, config=config)
print(f"Main timeline: {result2['path']}")

# Get history to find fork point
history = list(graph.get_state_history(config))
print(f"Found {len(history)} checkpoints")

# Fork from an earlier checkpoint
if len(history) > 1:
    fork_checkpoint = history[-1]  # Oldest checkpoint
    
    fork_config = {
        "configurable": {
            "thread_id": "experiment-1",
            "checkpoint_id": fork_checkpoint.config['configurable']['checkpoint_id']
        }
    }
    
    # Continue from that point with different input
    forked_result = graph.invoke(
        {"step": 0, "path": ["forked"]},
        config=fork_config
    )
    print(f"Forked timeline: {forked_result['path']}")
```

## Getting Current State

Inspect the current state of a thread:

```python theme={null}
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class InspectState(TypedDict):
    data: str
    processed: bool

def process_node(state: InspectState) -> dict:
    """Process the data."""
    return {"data": state["data"].upper(), "processed": True}

builder = StateGraph(InspectState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "inspect-demo"}}
graph.invoke({"data": "hello", "processed": False}, config=config)

# Get current state
current_state = graph.get_state(config)
if current_state:
    print(f"Current values: {current_state.values}")
    print(f"Next nodes: {current_state.next}")
    print(f"Thread ID: {current_state.config['configurable']['thread_id']}")
```

## Updating State Manually

Modify state outside of graph execution:

```python theme={null}
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class EditableState(TypedDict):
    counter: int
    notes: str

def increment_node(state: EditableState) -> dict:
    """Increment the counter."""
    return {"counter": state["counter"] + 1}

builder = StateGraph(EditableState)
builder.add_node("increment", increment_node)
builder.add_edge(START, "increment")
builder.add_edge("increment", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "editable-demo"}}

# Run once - counter goes from 0 to 1
graph.invoke({"counter": 0, "notes": ""}, config=config)
print(f"After run: counter={graph.get_state(config).values['counter']}")  # 1

# Manually update state to 100
graph.update_state(
    config,
    values={"counter": 100, "notes": "Manually set to 100"},
    as_node="admin"  # Optional: track who made the change
)
print(f"After update: counter={graph.get_state(config).values['counter']}")  # 100

# Re-run the graph from START with the updated state
# Pass an empty dict {} to restart from START with current state
result = graph.invoke({}, config=config)
print(f"After re-run: counter={result['counter']}")  # 101
```

<Info>
  **Note on resuming execution:**

  * Pass `{}` (empty dict) to restart the graph from START with the current checkpoint state
  * Pass `None` to resume from where execution left off - but if the graph already reached END, it just returns the current state without re-executing
</Info>

## Durability Modes

Control when checkpoints are saved:

| Mode    | Behavior                | Use Case                              |
| ------- | ----------------------- | ------------------------------------- |
| `sync`  | Save before continuing  | Maximum safety, slower                |
| `async` | Save in background      | Balance of safety and speed (default) |
| `exit`  | Save only on completion | Maximum speed, less safe              |

```python theme={null}
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class DurableState(TypedDict):
    value: int

def compute_node(state: DurableState) -> dict:
    """Perform computation."""
    return {"value": state["value"] * 2}

builder = StateGraph(DurableState)
builder.add_node("compute", compute_node)
builder.add_edge(START, "compute")
builder.add_edge("compute", END)

checkpointer = MemorySaver()

# Sync - guaranteed persistence at each step
# Best for critical workflows where data loss is unacceptable
graph_sync = builder.compile(
    checkpointer=checkpointer,
    durability="sync"
)

# Async - background persistence (default)
# Good balance for most applications
graph_async = builder.compile(
    checkpointer=checkpointer,
    durability="async"
)

# Exit - only persist at the end
# Best for high-throughput, less critical workflows
graph_exit = builder.compile(
    checkpointer=checkpointer,
    durability="exit"
)

# Use the appropriate mode for your use case
config = {"configurable": {"thread_id": "durable-demo"}}
result = graph_sync.invoke({"value": 5}, config=config)
print(f"Result: {result['value']}")  # 10
```

## Next Steps

* [Human-in-Loop](/concepts/stategraph/advanced/human-in-loop) - Learn about interrupts and approval workflows
* [Reliability](/concepts/stategraph/advanced/reliability) - Master retry and cache policies
* [Advanced Features](/concepts/stategraph/advanced/advanced-features) - Explore Send API and task decorators
