Skip to main content

Overview

StateGraph provides comprehensive reliability features to handle failures gracefully and optimize performance:
  • 🔄 Retry Policies - Automatic retry with exponential backoff
  • 💾 Cache Policies - Avoid re-executing expensive operations
  • 🛡️ Durability Modes - Control when state is persisted
  • 🔁 Failure Recovery - Resume from the last successful checkpoint

Retry Policies

Retry policies handle transient failures automatically.

Basic Retry

from upsonic.graphv2 import StateGraph, RetryPolicy

def unstable_node(state: MyState) -> dict:
    """A node that might fail."""
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    return {"data": response.json()}

builder = StateGraph(MyState)

# Add node with retry policy
builder.add_node(
    "fetch_data",
    unstable_node,
    retry_policy=RetryPolicy(
        max_attempts=3,           # Try up to 3 times
        initial_interval=1.0,     # Wait 1 second before first retry
        backoff_factor=2.0,       # Double wait time after each retry
        max_interval=30.0,        # Cap wait time at 30 seconds
        jitter=True               # Add randomness to prevent thundering herd
    )
)

Retry Configuration

ParameterDefaultDescription
max_attempts3Maximum number of attempts (including first)
initial_interval0.5Seconds to wait before first retry
backoff_factor2.0Multiplier for wait time after each retry
max_interval128.0Maximum seconds between retries
jitterTrueAdd random variation to intervals
retry_onExceptionWhich exceptions trigger retry

Exponential Backoff

Retry intervals grow exponentially:
# With backoff_factor=2.0 and initial_interval=1.0:
# Attempt 1: immediate
# Attempt 2: wait 1.0s  (1.0 * 2^0)
# Attempt 3: wait 2.0s  (1.0 * 2^1)
# Attempt 4: wait 4.0s  (1.0 * 2^2)
# Attempt 5: wait 8.0s  (1.0 * 2^3)

retry_policy = RetryPolicy(
    max_attempts=5,
    initial_interval=1.0,
    backoff_factor=2.0,
    jitter=False  # Disable for predictable timing
)
Jitter adds randomness (±50%) to prevent multiple failed requests from retrying simultaneously.

Selective Retry

Only retry specific exceptions:
from upsonic.graphv2 import RetryPolicy

# Retry only on connection errors
retry_policy = RetryPolicy(
    max_attempts=3,
    retry_on=ConnectionError
)

# Retry on multiple exception types
retry_policy = RetryPolicy(
    max_attempts=3,
    retry_on=(ConnectionError, TimeoutError, RequestException)
)

# Custom retry logic
def should_retry(exception: Exception) -> bool:
    """Custom logic to decide if should retry."""
    if isinstance(exception, HTTPException):
        # Retry on 5xx errors, not 4xx
        return 500 <= exception.status_code < 600
    return isinstance(exception, (ConnectionError, TimeoutError))

retry_policy = RetryPolicy(
    max_attempts=3,
    retry_on=should_retry
)

Retry Examples

Example 1: API Call with Retry
def api_node(state: State) -> dict:
    """Call external API - retries on failure."""
    response = requests.post(
        "https://api.example.com/process",
        json={"data": state["input"]}
    )
    response.raise_for_status()
    return {"result": response.json()}

builder.add_node(
    "api_call",
    api_node,
    retry_policy=RetryPolicy(max_attempts=5, initial_interval=2.0)
)
Example 2: Database Query with Retry
def db_node(state: State) -> dict:
    """Query database - retries on connection loss."""
    result = db.query(state["query"])
    return {"data": result}

builder.add_node(
    "query_db",
    db_node,
    retry_policy=RetryPolicy(
        max_attempts=3,
        retry_on=(OperationalError, TimeoutError)
    )
)

Cache Policies

Cache policies avoid re-executing expensive operations.

Basic Caching

from upsonic.graphv2 import CachePolicy, InMemoryCache

def expensive_node(state: State) -> dict:
    """Expensive computation - results are cached."""
    result = complex_calculation(state["input"])
    return {"output": result}

builder = StateGraph(State)

# Add node with cache policy
builder.add_node(
    "compute",
    expensive_node,
    cache_policy=CachePolicy(
        ttl=300  # Cache for 5 minutes (seconds)
    )
)

# Compile with cache
cache = InMemoryCache()
graph = builder.compile(cache=cache)

# First call - executes and caches
result1 = graph.invoke({"input": "test"})

# Second call with same input - uses cache
result2 = graph.invoke({"input": "test"})  # Instant!

# Different input - executes again
result3 = graph.invoke({"input": "other"})
Cache keys are automatically generated from the node’s input state. Same input = cache hit.

Cache Configuration

from upsonic.graphv2 import CachePolicy

# Cache forever (no TTL)
cache_policy = CachePolicy(ttl=None)

# Cache for 1 hour
cache_policy = CachePolicy(ttl=3600)

# Custom cache key function
def custom_key(state: dict) -> str:
    """Generate cache key from specific fields."""
    return f"{state['user_id']}:{state['query']}"

cache_policy = CachePolicy(
    ttl=600,
    key_func=custom_key
)

Cache Implementations

In-Memory Cache - Fast but lost on restart:
from upsonic.graphv2 import InMemoryCache

cache = InMemoryCache()
graph = builder.compile(cache=cache)
SQLite Cache - Persistent across restarts:
import sqlite3
from upsonic.graphv2 import SqliteCache

conn = sqlite3.connect("cache.db")
cache = SqliteCache(conn)
graph = builder.compile(cache=cache)

Cache Examples

Example 1: ML Model Inference
def inference_node(state: State) -> dict:
    """Run ML inference - cache results."""
    prediction = model.predict(state["features"])
    return {"prediction": prediction}

builder.add_node(
    "inference",
    inference_node,
    cache_policy=CachePolicy(ttl=3600)  # 1 hour
)
Example 2: External Data Fetch
def fetch_user_data(state: State) -> dict:
    """Fetch user data - rarely changes."""
    user = api.get_user(state["user_id"])
    return {"user": user}

builder.add_node(
    "get_user",
    fetch_user_data,
    cache_policy=CachePolicy(ttl=86400)  # 24 hours
)
Example 3: Computation Pipeline
# Fast-changing data - short TTL
builder.add_node(
    "get_stock_price",
    get_stock_price,
    cache_policy=CachePolicy(ttl=60)  # 1 minute
)

# Slow-changing data - long TTL
builder.add_node(
    "get_company_profile",
    get_company_profile,
    cache_policy=CachePolicy(ttl=86400)  # 1 day
)

# Static computation - cache forever
builder.add_node(
    "calculate_statistics",
    calculate_statistics,
    cache_policy=CachePolicy(ttl=None)
)

Combined Retry and Cache

Use both for maximum reliability and performance:
from upsonic.graphv2 import RetryPolicy, CachePolicy

def robust_node(state: State) -> dict:
    """Retries on failure, caches on success."""
    data = fetch_expensive_data(state["query"])
    return {"data": data}

builder.add_node(
    "fetch",
    robust_node,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
    cache_policy=CachePolicy(ttl=600)
)

# First call with "query_A":
#   - Attempt 1: fails → retry after 1s
#   - Attempt 2: succeeds → cached for 10 minutes
# Second call with "query_A":
#   - Cache hit → instant result
# Third call with "query_B":
#   - Cache miss → fetches and caches
Order of Operations: Retry happens first, then successful results are cached.

Durability Modes

Control when checkpoints are saved.

Durability Options

ModeWhen SavedPerformanceSafetyUse Case
syncBefore each stepSlowestMaximumCritical workflows
asyncBackgroundFastHighMost workflows (default)
exitOn completionFastestMinimumStateless tasks

Sync Durability

Maximum safety - state guaranteed persisted before proceeding:
from upsonic.graphv2 import MemorySaver

checkpointer = MemorySaver()

graph = builder.compile(
    checkpointer=checkpointer,
    durability="sync"
)

# After each node:
# 1. Execute node
# 2. Save checkpoint (blocks)
# 3. Continue to next node
Pros:
  • ✅ State guaranteed saved before next step
  • ✅ No data loss on crash
  • ✅ Perfect for financial transactions, critical operations
Cons:
  • ❌ Slower due to synchronous I/O
  • ❌ Not suitable for high-throughput scenarios

Async Durability (Default)

Balanced approach - saves in background:
graph = builder.compile(
    checkpointer=checkpointer,
    durability="async"  # Default
)

# After each node:
# 1. Execute node
# 2. Queue checkpoint save (non-blocking)
# 3. Continue immediately
Pros:
  • ✅ Good performance
  • ✅ State eventually saved
  • ✅ Suitable for most workflows
Cons:
  • ⚠️ Small window where state might be lost if process crashes
  • ⚠️ Not for absolutely critical operations

Exit Durability

Fastest - saves only at completion:
graph = builder.compile(
    checkpointer=checkpointer,
    durability="exit"
)

# Checkpoints saved only when graph completes
# No intermediate persistence
Pros:
  • ✅ Maximum speed
  • ✅ Good for read-only or idempotent operations
Cons:
  • ❌ No persistence during execution
  • ❌ Crash loses all progress
  • ❌ Can’t use with interrupts

Choosing Durability Mode

# Critical financial workflow - use sync
payment_graph = builder.compile(
    checkpointer=checkpointer,
    durability="sync"
)

# Normal agent workflow - use async (default)
agent_graph = builder.compile(
    checkpointer=checkpointer,
    durability="async"
)

# Read-only analytics - use exit
analytics_graph = builder.compile(
    checkpointer=checkpointer,
    durability="exit"
)

# Stateless transformation - no checkpointer needed
transform_graph = builder.compile()  # No persistence

Failure Recovery

Resume execution from the last successful checkpoint.

Basic Recovery

from upsonic.graphv2 import StateGraph, MemorySaver

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "job-123"}}

# First attempt - fails at some point
try:
    result = graph.invoke(initial_state, config=config)
except Exception as e:
    print(f"Failed: {e}")
    
    # Check what was saved
    checkpoint = checkpointer.get("job-123")
    print(f"Progress saved: {checkpoint.state}")

# Fix the issue (update data, fix network, etc.)

# Resume from last checkpoint
result = graph.invoke(updated_state, config=config)
print(f"Completed: {result}")

Resume Without Input

Resume without providing new state:
config = {"configurable": {"thread_id": "job-123"}}

# First attempt fails
try:
    graph.invoke({"step": 1}, config=config)
except Exception:
    pass

# Simply retry - uses checkpoint automatically
result = graph.invoke(None, config=config)
Passing None as input resumes from the checkpoint without modifying state.

Partial Execution Recovery

class ProcessingState(TypedDict):
    files: List[str]
    processed: Annotated[List[str], operator.add]
    failed: Annotated[List[str], operator.add]

def process_files(state: ProcessingState) -> dict:
    """Process files - some might fail."""
    results = []
    failures = []
    
    for file in state["files"]:
        try:
            result = process_file(file)
            results.append(result)
        except Exception as e:
            failures.append(file)
            # Don't crash - continue with others
    
    return {
        "processed": results,
        "failed": failures
    }

# Later, retry only failed files
if result["failed"]:
    retry_result = graph.invoke(
        {"files": result["failed"]},
        config=config
    )

Circuit Breaker Pattern

Prevent cascading failures:
class CircuitBreakerState(TypedDict):
    failures: Annotated[int, lambda a, b: a + b]
    circuit_open: bool
    data: str

def protected_node(state: CircuitBreakerState) -> dict:
    """Node with circuit breaker."""
    # Check circuit breaker
    if state.get("circuit_open", False):
        return {
            "data": "Circuit open - using fallback",
            "circuit_open": True
        }
    
    try:
        # Risky operation
        result = risky_operation(state["data"])
        
        # Reset failures on success
        return {"data": result, "failures": -state["failures"]}
        
    except Exception:
        new_failures = state.get("failures", 0) + 1
        
        # Open circuit after 3 failures
        if new_failures >= 3:
            return {
                "failures": 1,
                "circuit_open": True,
                "data": "Circuit opened - too many failures"
            }
        
        # Still trying
        return {"failures": 1}

Best Practices

1. Retry Transient Failures Only

# ✅ Good - retry network issues
retry_policy = RetryPolicy(
    retry_on=(ConnectionError, TimeoutError, HTTPError)
)

# ❌ Bad - retry programming errors
retry_policy = RetryPolicy(
    retry_on=Exception  # Will retry TypeError, ValueError, etc.
)

2. Set Appropriate TTLs

# ✅ Good - TTL matches data volatility
builder.add_node(
    "get_stock_price",
    get_price,
    cache_policy=CachePolicy(ttl=60)  # 1 min for live data
)

builder.add_node(
    "get_historical_data",
    get_history,
    cache_policy=CachePolicy(ttl=86400)  # 1 day for historical
)

# ❌ Bad - caching volatile data too long
builder.add_node(
    "get_stock_price",
    get_price,
    cache_policy=CachePolicy(ttl=86400)  # Price outdated!
)

3. Choose Right Durability

# ✅ Critical operation - sync
builder.compile(checkpointer=cp, durability="sync")

# ✅ Normal workflow - async
builder.compile(checkpointer=cp, durability="async")

# ✅ Stateless job - exit or no checkpointer
builder.compile(checkpointer=cp, durability="exit")
# or
builder.compile()  # No persistence

4. Log Retry Attempts

import logging

def monitored_node(state: State) -> dict:
    """Node with retry monitoring."""
    try:
        result = risky_operation()
        return {"result": result}
    except Exception as e:
        logging.warning(f"Node failed: {e}")
        raise  # Let retry policy handle it

builder.add_node(
    "monitored",
    monitored_node,
    retry_policy=RetryPolicy(max_attempts=3)
)

5. Test Failure Scenarios

def test_retry_behavior():
    """Test that retries work as expected."""
    attempts = []
    
    def flaky_node(state):
        attempts.append(1)
        if len(attempts) < 3:
            raise ConnectionError("Simulated failure")
        return {"result": "success"}
    
    builder.add_node(
        "flaky",
        flaky_node,
        retry_policy=RetryPolicy(max_attempts=5)
    )
    
    graph = builder.compile()
    result = graph.invoke({"input": "test"})
    
    assert len(attempts) == 3, "Should retry twice then succeed"
    assert result["result"] == "success"

Complete Example: Robust Pipeline

from typing import Annotated, List
from typing_extensions import TypedDict
import operator

from upsonic.graphv2 import (
    StateGraph, START, END,
    RetryPolicy, CachePolicy,
    InMemoryCache, MemorySaver
)

class RobustState(TypedDict):
    user_id: str
    user_data: dict
    recommendations: List[str]
    enriched: Annotated[List[dict], operator.add]

# Node 1: Fetch user data (retry + cache)
def fetch_user(state: RobustState) -> dict:
    user = api.get_user(state["user_id"])  # Might fail
    return {"user_data": user}

# Node 2: Generate recommendations (retry only)
def generate_recs(state: RobustState) -> dict:
    recs = ml_model.recommend(state["user_data"])  # Might fail
    return {"recommendations": recs}

# Node 3: Enrich data (cache only - deterministic)
def enrich(state: RobustState) -> dict:
    enriched = [
        {"id": rec, "details": get_details(rec)}
        for rec in state["recommendations"]
    ]
    return {"enriched": enriched}

# Build robust pipeline
builder = StateGraph(RobustState)

builder.add_node(
    "fetch_user",
    fetch_user,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
    cache_policy=CachePolicy(ttl=300)  # Cache user data for 5 min
)

builder.add_node(
    "generate_recs",
    generate_recs,
    retry_policy=RetryPolicy(max_attempts=5, initial_interval=2.0)
)

builder.add_node(
    "enrich",
    enrich,
    cache_policy=CachePolicy(ttl=600)  # Cache enriched data for 10 min
)

builder.add_edge(START, "fetch_user")
builder.add_edge("fetch_user", "generate_recs")
builder.add_edge("generate_recs", "enrich")
builder.add_edge("enrich", END)

# Compile with all resilience features
cache = InMemoryCache()
checkpointer = MemorySaver()

graph = builder.compile(
    cache=cache,
    checkpointer=checkpointer,
    durability="async"  # Good balance
)

# Execute with recovery
config = {"configurable": {"thread_id": "user-123-session"}}

try:
    result = graph.invoke(
        {"user_id": "user-123", "user_data": {}, "recommendations": [], "enriched": []},
        config=config
    )
    print(f"Success: {len(result['enriched'])} recommendations")
except Exception as e:
    print(f"Failed: {e}")
    # Will auto-retry on next invoke with same config

Next Steps