Overview
StateGraph provides comprehensive reliability features to handle failures gracefully and optimize performance:
- 🔄 Retry Policies - Automatic retry with exponential backoff
- 💾 Cache Policies - Avoid re-executing expensive operations
- 🛡️ Durability Modes - Control when state is persisted
- 🔁 Failure Recovery - Resume from the last successful checkpoint
Retry Policies
Retry policies handle transient failures automatically.
Basic Retry
from upsonic.graphv2 import StateGraph, RetryPolicy
def unstable_node(state: MyState) -> dict:
"""A node that might fail."""
response = requests.get("https://api.example.com/data")
response.raise_for_status()
return {"data": response.json()}
builder = StateGraph(MyState)
# Add node with retry policy
builder.add_node(
"fetch_data",
unstable_node,
retry_policy=RetryPolicy(
max_attempts=3, # Try up to 3 times
initial_interval=1.0, # Wait 1 second before first retry
backoff_factor=2.0, # Double wait time after each retry
max_interval=30.0, # Cap wait time at 30 seconds
jitter=True # Add randomness to prevent thundering herd
)
)
Retry Configuration
| Parameter | Default | Description |
|---|
max_attempts | 3 | Maximum number of attempts (including first) |
initial_interval | 0.5 | Seconds to wait before first retry |
backoff_factor | 2.0 | Multiplier for wait time after each retry |
max_interval | 128.0 | Maximum seconds between retries |
jitter | True | Add random variation to intervals |
retry_on | Exception | Which exceptions trigger retry |
Exponential Backoff
Retry intervals grow exponentially:
# With backoff_factor=2.0 and initial_interval=1.0:
# Attempt 1: immediate
# Attempt 2: wait 1.0s (1.0 * 2^0)
# Attempt 3: wait 2.0s (1.0 * 2^1)
# Attempt 4: wait 4.0s (1.0 * 2^2)
# Attempt 5: wait 8.0s (1.0 * 2^3)
retry_policy = RetryPolicy(
max_attempts=5,
initial_interval=1.0,
backoff_factor=2.0,
jitter=False # Disable for predictable timing
)
Jitter adds randomness (±50%) to prevent multiple failed requests from retrying simultaneously.
Selective Retry
Only retry specific exceptions:
from upsonic.graphv2 import RetryPolicy
# Retry only on connection errors
retry_policy = RetryPolicy(
max_attempts=3,
retry_on=ConnectionError
)
# Retry on multiple exception types
retry_policy = RetryPolicy(
max_attempts=3,
retry_on=(ConnectionError, TimeoutError, RequestException)
)
# Custom retry logic
def should_retry(exception: Exception) -> bool:
"""Custom logic to decide if should retry."""
if isinstance(exception, HTTPException):
# Retry on 5xx errors, not 4xx
return 500 <= exception.status_code < 600
return isinstance(exception, (ConnectionError, TimeoutError))
retry_policy = RetryPolicy(
max_attempts=3,
retry_on=should_retry
)
Retry Examples
Example 1: API Call with Retry
def api_node(state: State) -> dict:
"""Call external API - retries on failure."""
response = requests.post(
"https://api.example.com/process",
json={"data": state["input"]}
)
response.raise_for_status()
return {"result": response.json()}
builder.add_node(
"api_call",
api_node,
retry_policy=RetryPolicy(max_attempts=5, initial_interval=2.0)
)
Example 2: Database Query with Retry
def db_node(state: State) -> dict:
"""Query database - retries on connection loss."""
result = db.query(state["query"])
return {"data": result}
builder.add_node(
"query_db",
db_node,
retry_policy=RetryPolicy(
max_attempts=3,
retry_on=(OperationalError, TimeoutError)
)
)
Cache Policies
Cache policies avoid re-executing expensive operations.
Basic Caching
from upsonic.graphv2 import CachePolicy, InMemoryCache
def expensive_node(state: State) -> dict:
"""Expensive computation - results are cached."""
result = complex_calculation(state["input"])
return {"output": result}
builder = StateGraph(State)
# Add node with cache policy
builder.add_node(
"compute",
expensive_node,
cache_policy=CachePolicy(
ttl=300 # Cache for 5 minutes (seconds)
)
)
# Compile with cache
cache = InMemoryCache()
graph = builder.compile(cache=cache)
# First call - executes and caches
result1 = graph.invoke({"input": "test"})
# Second call with same input - uses cache
result2 = graph.invoke({"input": "test"}) # Instant!
# Different input - executes again
result3 = graph.invoke({"input": "other"})
Cache keys are automatically generated from the node’s input state. Same input = cache hit.
Cache Configuration
from upsonic.graphv2 import CachePolicy
# Cache forever (no TTL)
cache_policy = CachePolicy(ttl=None)
# Cache for 1 hour
cache_policy = CachePolicy(ttl=3600)
# Custom cache key function
def custom_key(state: dict) -> str:
"""Generate cache key from specific fields."""
return f"{state['user_id']}:{state['query']}"
cache_policy = CachePolicy(
ttl=600,
key_func=custom_key
)
Cache Implementations
In-Memory Cache - Fast but lost on restart:
from upsonic.graphv2 import InMemoryCache
cache = InMemoryCache()
graph = builder.compile(cache=cache)
SQLite Cache - Persistent across restarts:
import sqlite3
from upsonic.graphv2 import SqliteCache
conn = sqlite3.connect("cache.db")
cache = SqliteCache(conn)
graph = builder.compile(cache=cache)
Cache Examples
Example 1: ML Model Inference
def inference_node(state: State) -> dict:
"""Run ML inference - cache results."""
prediction = model.predict(state["features"])
return {"prediction": prediction}
builder.add_node(
"inference",
inference_node,
cache_policy=CachePolicy(ttl=3600) # 1 hour
)
Example 2: External Data Fetch
def fetch_user_data(state: State) -> dict:
"""Fetch user data - rarely changes."""
user = api.get_user(state["user_id"])
return {"user": user}
builder.add_node(
"get_user",
fetch_user_data,
cache_policy=CachePolicy(ttl=86400) # 24 hours
)
Example 3: Computation Pipeline
# Fast-changing data - short TTL
builder.add_node(
"get_stock_price",
get_stock_price,
cache_policy=CachePolicy(ttl=60) # 1 minute
)
# Slow-changing data - long TTL
builder.add_node(
"get_company_profile",
get_company_profile,
cache_policy=CachePolicy(ttl=86400) # 1 day
)
# Static computation - cache forever
builder.add_node(
"calculate_statistics",
calculate_statistics,
cache_policy=CachePolicy(ttl=None)
)
Combined Retry and Cache
Use both for maximum reliability and performance:
from upsonic.graphv2 import RetryPolicy, CachePolicy
def robust_node(state: State) -> dict:
"""Retries on failure, caches on success."""
data = fetch_expensive_data(state["query"])
return {"data": data}
builder.add_node(
"fetch",
robust_node,
retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
cache_policy=CachePolicy(ttl=600)
)
# First call with "query_A":
# - Attempt 1: fails → retry after 1s
# - Attempt 2: succeeds → cached for 10 minutes
# Second call with "query_A":
# - Cache hit → instant result
# Third call with "query_B":
# - Cache miss → fetches and caches
Order of Operations: Retry happens first, then successful results are cached.
Durability Modes
Control when checkpoints are saved.
Durability Options
| Mode | When Saved | Performance | Safety | Use Case |
|---|
sync | Before each step | Slowest | Maximum | Critical workflows |
async | Background | Fast | High | Most workflows (default) |
exit | On completion | Fastest | Minimum | Stateless tasks |
Sync Durability
Maximum safety - state guaranteed persisted before proceeding:
from upsonic.graphv2 import MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(
checkpointer=checkpointer,
durability="sync"
)
# After each node:
# 1. Execute node
# 2. Save checkpoint (blocks)
# 3. Continue to next node
Pros:
- ✅ State guaranteed saved before next step
- ✅ No data loss on crash
- ✅ Perfect for financial transactions, critical operations
Cons:
- ❌ Slower due to synchronous I/O
- ❌ Not suitable for high-throughput scenarios
Async Durability (Default)
Balanced approach - saves in background:
graph = builder.compile(
checkpointer=checkpointer,
durability="async" # Default
)
# After each node:
# 1. Execute node
# 2. Queue checkpoint save (non-blocking)
# 3. Continue immediately
Pros:
- ✅ Good performance
- ✅ State eventually saved
- ✅ Suitable for most workflows
Cons:
- ⚠️ Small window where state might be lost if process crashes
- ⚠️ Not for absolutely critical operations
Exit Durability
Fastest - saves only at completion:
graph = builder.compile(
checkpointer=checkpointer,
durability="exit"
)
# Checkpoints saved only when graph completes
# No intermediate persistence
Pros:
- ✅ Maximum speed
- ✅ Good for read-only or idempotent operations
Cons:
- ❌ No persistence during execution
- ❌ Crash loses all progress
- ❌ Can’t use with interrupts
Choosing Durability Mode
# Critical financial workflow - use sync
payment_graph = builder.compile(
checkpointer=checkpointer,
durability="sync"
)
# Normal agent workflow - use async (default)
agent_graph = builder.compile(
checkpointer=checkpointer,
durability="async"
)
# Read-only analytics - use exit
analytics_graph = builder.compile(
checkpointer=checkpointer,
durability="exit"
)
# Stateless transformation - no checkpointer needed
transform_graph = builder.compile() # No persistence
Failure Recovery
Resume execution from the last successful checkpoint.
Basic Recovery
from upsonic.graphv2 import StateGraph, MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "job-123"}}
# First attempt - fails at some point
try:
result = graph.invoke(initial_state, config=config)
except Exception as e:
print(f"Failed: {e}")
# Check what was saved
checkpoint = checkpointer.get("job-123")
print(f"Progress saved: {checkpoint.state}")
# Fix the issue (update data, fix network, etc.)
# Resume from last checkpoint
result = graph.invoke(updated_state, config=config)
print(f"Completed: {result}")
Resume without providing new state:
config = {"configurable": {"thread_id": "job-123"}}
# First attempt fails
try:
graph.invoke({"step": 1}, config=config)
except Exception:
pass
# Simply retry - uses checkpoint automatically
result = graph.invoke(None, config=config)
Passing None as input resumes from the checkpoint without modifying state.
Partial Execution Recovery
class ProcessingState(TypedDict):
files: List[str]
processed: Annotated[List[str], operator.add]
failed: Annotated[List[str], operator.add]
def process_files(state: ProcessingState) -> dict:
"""Process files - some might fail."""
results = []
failures = []
for file in state["files"]:
try:
result = process_file(file)
results.append(result)
except Exception as e:
failures.append(file)
# Don't crash - continue with others
return {
"processed": results,
"failed": failures
}
# Later, retry only failed files
if result["failed"]:
retry_result = graph.invoke(
{"files": result["failed"]},
config=config
)
Circuit Breaker Pattern
Prevent cascading failures:
class CircuitBreakerState(TypedDict):
failures: Annotated[int, lambda a, b: a + b]
circuit_open: bool
data: str
def protected_node(state: CircuitBreakerState) -> dict:
"""Node with circuit breaker."""
# Check circuit breaker
if state.get("circuit_open", False):
return {
"data": "Circuit open - using fallback",
"circuit_open": True
}
try:
# Risky operation
result = risky_operation(state["data"])
# Reset failures on success
return {"data": result, "failures": -state["failures"]}
except Exception:
new_failures = state.get("failures", 0) + 1
# Open circuit after 3 failures
if new_failures >= 3:
return {
"failures": 1,
"circuit_open": True,
"data": "Circuit opened - too many failures"
}
# Still trying
return {"failures": 1}
Best Practices
1. Retry Transient Failures Only
# ✅ Good - retry network issues
retry_policy = RetryPolicy(
retry_on=(ConnectionError, TimeoutError, HTTPError)
)
# ❌ Bad - retry programming errors
retry_policy = RetryPolicy(
retry_on=Exception # Will retry TypeError, ValueError, etc.
)
2. Set Appropriate TTLs
# ✅ Good - TTL matches data volatility
builder.add_node(
"get_stock_price",
get_price,
cache_policy=CachePolicy(ttl=60) # 1 min for live data
)
builder.add_node(
"get_historical_data",
get_history,
cache_policy=CachePolicy(ttl=86400) # 1 day for historical
)
# ❌ Bad - caching volatile data too long
builder.add_node(
"get_stock_price",
get_price,
cache_policy=CachePolicy(ttl=86400) # Price outdated!
)
3. Choose Right Durability
# ✅ Critical operation - sync
builder.compile(checkpointer=cp, durability="sync")
# ✅ Normal workflow - async
builder.compile(checkpointer=cp, durability="async")
# ✅ Stateless job - exit or no checkpointer
builder.compile(checkpointer=cp, durability="exit")
# or
builder.compile() # No persistence
4. Log Retry Attempts
import logging
def monitored_node(state: State) -> dict:
"""Node with retry monitoring."""
try:
result = risky_operation()
return {"result": result}
except Exception as e:
logging.warning(f"Node failed: {e}")
raise # Let retry policy handle it
builder.add_node(
"monitored",
monitored_node,
retry_policy=RetryPolicy(max_attempts=3)
)
5. Test Failure Scenarios
def test_retry_behavior():
"""Test that retries work as expected."""
attempts = []
def flaky_node(state):
attempts.append(1)
if len(attempts) < 3:
raise ConnectionError("Simulated failure")
return {"result": "success"}
builder.add_node(
"flaky",
flaky_node,
retry_policy=RetryPolicy(max_attempts=5)
)
graph = builder.compile()
result = graph.invoke({"input": "test"})
assert len(attempts) == 3, "Should retry twice then succeed"
assert result["result"] == "success"
Complete Example: Robust Pipeline
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import (
StateGraph, START, END,
RetryPolicy, CachePolicy,
InMemoryCache, MemorySaver
)
class RobustState(TypedDict):
user_id: str
user_data: dict
recommendations: List[str]
enriched: Annotated[List[dict], operator.add]
# Node 1: Fetch user data (retry + cache)
def fetch_user(state: RobustState) -> dict:
user = api.get_user(state["user_id"]) # Might fail
return {"user_data": user}
# Node 2: Generate recommendations (retry only)
def generate_recs(state: RobustState) -> dict:
recs = ml_model.recommend(state["user_data"]) # Might fail
return {"recommendations": recs}
# Node 3: Enrich data (cache only - deterministic)
def enrich(state: RobustState) -> dict:
enriched = [
{"id": rec, "details": get_details(rec)}
for rec in state["recommendations"]
]
return {"enriched": enriched}
# Build robust pipeline
builder = StateGraph(RobustState)
builder.add_node(
"fetch_user",
fetch_user,
retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
cache_policy=CachePolicy(ttl=300) # Cache user data for 5 min
)
builder.add_node(
"generate_recs",
generate_recs,
retry_policy=RetryPolicy(max_attempts=5, initial_interval=2.0)
)
builder.add_node(
"enrich",
enrich,
cache_policy=CachePolicy(ttl=600) # Cache enriched data for 10 min
)
builder.add_edge(START, "fetch_user")
builder.add_edge("fetch_user", "generate_recs")
builder.add_edge("generate_recs", "enrich")
builder.add_edge("enrich", END)
# Compile with all resilience features
cache = InMemoryCache()
checkpointer = MemorySaver()
graph = builder.compile(
cache=cache,
checkpointer=checkpointer,
durability="async" # Good balance
)
# Execute with recovery
config = {"configurable": {"thread_id": "user-123-session"}}
try:
result = graph.invoke(
{"user_id": "user-123", "user_data": {}, "recommendations": [], "enriched": []},
config=config
)
print(f"Success: {len(result['enriched'])} recommendations")
except Exception as e:
print(f"Failed: {e}")
# Will auto-retry on next invoke with same config
Next Steps