Overview
StateGraph provides comprehensive reliability features:
- π Retry Policies - Automatic retry with exponential backoff
- πΎ Cache Policies - Avoid re-executing expensive operations
- π‘οΈ Durability Modes - Control when state is persisted
- π Failure Recovery - Resume from the last successful checkpoint
Retry Policies
Retry policies handle transient failures automatically.
Basic Retry
from upsonic.graphv2 import StateGraph, RetryPolicy
def unstable_node(state: MyState) -> dict:
"""A node that might fail."""
# Simulate API call
import random
if random.random() < 0.3:
raise ConnectionError("Simulated failure")
return {"data": "success"}
builder = StateGraph(MyState)
# Add node with retry policy
builder.add_node(
"fetch_data",
unstable_node,
retry_policy=RetryPolicy(
max_attempts=3,
initial_interval=1.0,
backoff_factor=2.0,
max_interval=30.0,
jitter=True
)
)
Retry Configuration
| Parameter | Default | Description |
max_attempts | 3 | Maximum number of attempts |
initial_interval | 0.5 | Seconds to wait before first retry |
backoff_factor | 2.0 | Multiplier for wait time |
max_interval | 128.0 | Maximum seconds between retries |
jitter | True | Add random variation to intervals |
retry_on | Exception | Which exceptions trigger retry |
Selective Retry
# Retry only on connection errors
retry_policy = RetryPolicy(
max_attempts=3,
retry_on=ConnectionError
)
# Retry on multiple exception types
retry_policy = RetryPolicy(
max_attempts=3,
retry_on=(ConnectionError, TimeoutError)
)
Cache Policies
Cache policies avoid re-executing expensive operations.
Basic Caching
from upsonic.graphv2 import CachePolicy, InMemoryCache
def expensive_node(state: State) -> dict:
"""Expensive computation - results are cached."""
result = complex_calculation(state["input"])
return {"output": result}
builder = StateGraph(State)
# Add node with cache policy
builder.add_node(
"compute",
expensive_node,
cache_policy=CachePolicy(ttl=300) # Cache for 5 minutes
)
# Compile with cache
cache = InMemoryCache()
graph = builder.compile(cache=cache)
# First call - executes and caches
result1 = graph.invoke({"input": "test"})
# Second call with same input - uses cache
result2 = graph.invoke({"input": "test"}) # Instant!
Cache keys are automatically generated from the nodeβs input state. Same input = cache hit.
Cache Configuration
# Cache forever (no TTL)
cache_policy = CachePolicy(ttl=None)
# Cache for 1 hour
cache_policy = CachePolicy(ttl=3600)
Combined Retry and Cache
Use both for maximum reliability and performance:
builder.add_node(
"fetch",
robust_node,
retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
cache_policy=CachePolicy(ttl=600)
)
Order of Operations: Retry happens first, then successful results are cached.
Failure Recovery
Resume execution from the last successful checkpoint:
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "job-123"}}
# First attempt - fails at some point
try:
result = graph.invoke(initial_state, config=config)
except Exception as e:
print(f"Failed: {e}")
# Fix the issue and resume
result = graph.invoke(updated_state, config=config)
Best Practices
1. Retry Transient Failures Only
# β
Good - retry network issues
retry_policy = RetryPolicy(
retry_on=(ConnectionError, TimeoutError)
)
2. Set Appropriate TTLs
# β
Good - TTL matches data volatility
builder.add_node(
"get_stock_price",
get_price,
cache_policy=CachePolicy(ttl=60) # 1 min for live data
)
3. Choose Right Durability
# Critical operation - sync
builder.compile(checkpointer=cp, durability="sync")
# Normal workflow - async (default)
builder.compile(checkpointer=cp, durability="async")
Next Steps