Overview
StateGraph provides comprehensive reliability features:- 🔄 Retry Policies - Automatic retry with exponential backoff
- 💾 Cache Policies - Avoid re-executing expensive operations
- 🛡️ Durability Modes - Control when state is persisted
- 🔁 Failure Recovery - Resume from the last successful checkpoint
Retry Policies
Retry policies handle transient failures automatically.Basic Retry
Copy
import random
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy
class FetchState(TypedDict):
url: str
data: str
success: bool
def unstable_fetch(state: FetchState) -> dict:
"""Simulate an unstable API call that might fail."""
# Simulate 30% failure rate
if random.random() < 0.3:
raise ConnectionError("Simulated network failure")
return {"data": f"Data from {state['url']}", "success": True}
# Build the graph
builder = StateGraph(FetchState)
# Add node with retry policy
builder.add_node(
"fetch_data",
unstable_fetch,
retry_policy=RetryPolicy(
max_attempts=3,
initial_interval=1.0,
backoff_factor=2.0,
max_interval=30.0,
jitter=True
)
)
builder.add_edge(START, "fetch_data")
builder.add_edge("fetch_data", END)
graph = builder.compile()
# Execute - will retry up to 3 times on failure
result = graph.invoke({"url": "https://api.example.com", "data": "", "success": False})
print(f"Success: {result['success']}, Data: {result['data']}")
Retry Configuration
| Parameter | Default | Description |
|---|---|---|
max_attempts | 3 | Maximum number of attempts |
initial_interval | 0.5 | Seconds to wait before first retry |
backoff_factor | 2.0 | Multiplier for wait time |
max_interval | 128.0 | Maximum seconds between retries |
jitter | True | Add random variation to intervals |
retry_on | Exception | Which exceptions trigger retry |
Selective Retry
Only retry specific exception types:Copy
import random
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy
class ApiState(TypedDict):
endpoint: str
response: str
# Track attempt count for demonstration
attempt_counter = {"count": 0}
def call_api(state: ApiState) -> dict:
"""Make an API call that might fail on first attempts."""
attempt_counter["count"] += 1
# Fail first 2 attempts, succeed on 3rd
if attempt_counter["count"] < 3:
raise ConnectionError(f"Network unreachable (attempt {attempt_counter['count']})")
return {"response": f"Success on attempt {attempt_counter['count']}!"}
builder = StateGraph(ApiState)
# Retry only on ConnectionError (up to 3 attempts)
builder.add_node(
"api_call",
call_api,
retry_policy=RetryPolicy(
max_attempts=3,
initial_interval=0.1, # Short interval for demo
retry_on=ConnectionError
)
)
builder.add_edge(START, "api_call")
builder.add_edge("api_call", END)
graph = builder.compile()
# Execute - will retry on ConnectionError and succeed on 3rd attempt
try:
result = graph.invoke({"endpoint": "https://api.example.com", "response": ""})
print(f"Result: {result['response']}") # Success on attempt 3!
except Exception as e:
print(f"Failed after retries: {e}")
Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy
class MultiRetryState(TypedDict):
data: str
result: str
retry_count = {"value": 0}
def flaky_operation(state: MultiRetryState) -> dict:
"""Operation that fails with different errors."""
retry_count["value"] += 1
if retry_count["value"] == 1:
raise ConnectionError("Network issue")
elif retry_count["value"] == 2:
raise TimeoutError("Request timed out")
return {"result": f"Completed after {retry_count['value']} attempts"}
builder = StateGraph(MultiRetryState)
builder.add_node(
"operation",
flaky_operation,
retry_policy=RetryPolicy(
max_attempts=5,
initial_interval=0.1,
retry_on=(ConnectionError, TimeoutError) # Both trigger retry
)
)
builder.add_edge(START, "operation")
builder.add_edge("operation", END)
graph = builder.compile()
# Execute - retries on both ConnectionError and TimeoutError
result = graph.invoke({"data": "test", "result": ""})
print(f"Result: {result['result']}") # Completed after 3 attempts
Custom Retry Logic
Use a function to determine whether to retry:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy
class SmartRetryState(TypedDict):
request_id: str
result: str
def should_retry_exception(exception: Exception) -> bool:
"""Custom logic to determine if we should retry."""
# Retry on connection issues
if isinstance(exception, (ConnectionError, TimeoutError)):
return True
# Check for HTTP status codes if available
if hasattr(exception, 'response'):
status_code = getattr(exception.response, 'status_code', 0)
# Retry on 5xx server errors, but not 4xx client errors
if 500 <= status_code < 600:
return True
if 400 <= status_code < 500:
return False
# Default: don't retry unknown errors
return False
# Track attempts for demonstration
smart_attempts = {"count": 0}
def make_request(state: SmartRetryState) -> dict:
"""Make a request that might fail."""
smart_attempts["count"] += 1
# Simulate transient failure on first attempt
if smart_attempts["count"] == 1:
raise ConnectionError("Temporary network issue")
return {"result": f"Success! Request {state['request_id']} completed."}
builder = StateGraph(SmartRetryState)
builder.add_node(
"request",
make_request,
retry_policy=RetryPolicy(
max_attempts=5,
initial_interval=0.1,
retry_on=should_retry_exception # Custom function
)
)
builder.add_edge(START, "request")
builder.add_edge("request", END)
graph = builder.compile()
# Execute - custom retry logic determines what to retry
result = graph.invoke({"request_id": "req-123", "result": ""})
print(f"Result: {result['result']}") # Success! Request req-123 completed.
print(f"Total attempts: {smart_attempts['count']}") # 2
Cache Policies
Cache policies avoid re-executing expensive operations.Basic Caching
Copy
import time
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, CachePolicy, InMemoryCache
class ComputeState(TypedDict):
input: str
output: str
def complex_calculation(input_value: str) -> str:
"""Simulate an expensive computation."""
time.sleep(0.1) # Simulate processing time
return f"processed_{input_value}"
def expensive_node(state: ComputeState) -> dict:
"""Expensive computation - results are cached."""
result = complex_calculation(state["input"])
return {"output": result}
# Build the graph
builder = StateGraph(ComputeState)
# Add node with cache policy (5 minute TTL)
builder.add_node(
"compute",
expensive_node,
cache_policy=CachePolicy(ttl=300)
)
builder.add_edge(START, "compute")
builder.add_edge("compute", END)
# Compile with cache
cache = InMemoryCache()
graph = builder.compile(cache=cache)
# First call - executes and caches
print("First call (should be slow)...")
start = time.time()
result1 = graph.invoke({"input": "test", "output": ""})
print(f"Took {time.time() - start:.3f}s - Result: {result1['output']}")
# Second call with same input - uses cache
print("\nSecond call (should be instant)...")
start = time.time()
result2 = graph.invoke({"input": "test", "output": ""})
print(f"Took {time.time() - start:.3f}s - Result: {result2['output']}")
# Third call with different input - executes again
print("\nThird call with different input (should be slow)...")
start = time.time()
result3 = graph.invoke({"input": "different", "output": ""})
print(f"Took {time.time() - start:.3f}s - Result: {result3['output']}")
# Verify cache hit for original input
print("\nFourth call with original input (should be instant)...")
start = time.time()
result4 = graph.invoke({"input": "test", "output": ""})
print(f"Took {time.time() - start:.3f}s - Result: {result4['output']}")
Cache keys are automatically generated from the node’s input state. Same input = cache hit.
Cache Configuration
Copy
from upsonic.graphv2 import CachePolicy
# Cache forever (no TTL)
cache_forever = CachePolicy(ttl=None)
# Cache for 1 hour
cache_1h = CachePolicy(ttl=3600)
# Cache for 5 minutes
cache_5m = CachePolicy(ttl=300)
# Cache for 60 seconds (good for frequently changing data)
cache_1m = CachePolicy(ttl=60)
SQLite Cache for Persistence
Use SQLite cache to persist cached results across restarts:Copy
import sqlite3
import time
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, CachePolicy, SqliteCache
class PersistentCacheState(TypedDict):
query: str
result: str
# Track execution for demonstration
query_execution_count = {"value": 0}
def database_query(state: PersistentCacheState) -> dict:
"""Simulate an expensive database query."""
query_execution_count["value"] += 1
print(f" [Executing query #{query_execution_count['value']}...]")
time.sleep(0.3) # Simulate query time
return {"result": f"Results for: {state['query']} (execution #{query_execution_count['value']})"}
builder = StateGraph(PersistentCacheState)
builder.add_node(
"query",
database_query,
cache_policy=CachePolicy(ttl=3600) # Cache for 1 hour
)
builder.add_edge(START, "query")
builder.add_edge("query", END)
# Use SQLite cache with check_same_thread=False for async compatibility
conn = sqlite3.connect("query_cache.db", check_same_thread=False)
cache = SqliteCache(conn)
graph = builder.compile(cache=cache)
# First call - executes the query (slow)
print("First call (executes query):")
start = time.time()
result1 = graph.invoke({"query": "SELECT * FROM users", "result": ""})
print(f" Took {time.time() - start:.3f}s")
print(f" Result: {result1['result']}")
# Second call with same input - cache hit (instant)
print("\nSecond call (cache hit):")
start = time.time()
result2 = graph.invoke({"query": "SELECT * FROM users", "result": ""})
print(f" Took {time.time() - start:.3f}s")
print(f" Result: {result2['result']}")
# Different query - cache miss (slow again)
print("\nThird call with different query (cache miss):")
start = time.time()
result3 = graph.invoke({"query": "SELECT * FROM orders", "result": ""})
print(f" Took {time.time() - start:.3f}s")
print(f" Result: {result3['result']}")
# Verify total executions
print(f"\nTotal query executions: {query_execution_count['value']}") # 2 (not 3!)
# Cache persists across program restarts since it's stored in SQLite
Important: Always use
check_same_thread=False when creating SQLite connections for StateGraph. This is required because the graph uses asyncio internally.Combined Retry and Cache
Use both for maximum reliability and performance:Copy
import random
import time
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy, CachePolicy, InMemoryCache
class RobustState(TypedDict):
api_key: str
data: str
success: bool
def robust_api_call(state: RobustState) -> dict:
"""API call with retries and caching."""
# Simulate occasional failures
if random.random() < 0.2:
raise ConnectionError("Temporary network issue")
# Simulate slow response
time.sleep(0.2)
return {
"data": f"Data fetched with key: {state['api_key']}",
"success": True
}
builder = StateGraph(RobustState)
# Add node with BOTH retry and cache policies
builder.add_node(
"fetch",
robust_api_call,
retry_policy=RetryPolicy(
max_attempts=3,
initial_interval=1.0,
backoff_factor=2.0
),
cache_policy=CachePolicy(ttl=600) # Cache for 10 minutes
)
builder.add_edge(START, "fetch")
builder.add_edge("fetch", END)
cache = InMemoryCache()
graph = builder.compile(cache=cache)
# Execute - retries on failure, caches successful results
config = {"configurable": {"thread_id": "robust-api"}}
result = graph.invoke(
{"api_key": "my-key", "data": "", "success": False}
)
print(f"Success: {result['success']}, Data: {result['data']}")
Order of Operations: Retry happens first, then successful results are cached.
Failure Recovery
Resume execution from the last successful checkpoint:Copy
from typing import Annotated
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class RecoveryState(TypedDict):
step: int
completed_steps: Annotated[list, operator.add]
should_fail: bool
# Track failures for demo
failure_count = {"step2": 0}
def step1_node(state: RecoveryState) -> dict:
"""First step - always succeeds."""
return {"step": 1, "completed_steps": ["step1"]}
def step2_node(state: RecoveryState) -> dict:
"""Second step - fails on first attempt, succeeds on retry."""
failure_count["step2"] += 1
if state["should_fail"] and failure_count["step2"] == 1:
raise RuntimeError("Simulated failure in step 2")
return {"step": 2, "completed_steps": ["step2"]}
def step3_node(state: RecoveryState) -> dict:
"""Third step - always succeeds."""
return {"step": 3, "completed_steps": ["step3"]}
# Build the graph
builder = StateGraph(RecoveryState)
builder.add_node("step1", step1_node)
builder.add_node("step2", step2_node)
builder.add_node("step3", step3_node)
builder.add_edge(START, "step1")
builder.add_edge("step1", "step2")
builder.add_edge("step2", "step3")
builder.add_edge("step3", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "recovery-demo"}}
# First attempt - will fail at step2
print("First attempt...")
try:
result = graph.invoke(
{"step": 0, "completed_steps": [], "should_fail": True},
config=config
)
except RuntimeError as e:
print(f"Failed: {e}")
# Check what was completed
state = graph.get_state(config)
if state:
print(f"Completed steps before failure: {state.values.get('completed_steps', [])}")
# Resume from failure - pass None to continue from checkpoint
print("\nResuming from checkpoint...")
result = graph.invoke(
{"should_fail": False}, # Fix the issue
config=config
)
print(f"Final result: {result}")
print(f"All completed steps: {result['completed_steps']}")
Best Practices
1. Retry Transient Failures Only
Don’t retry permanent failures:Copy
from upsonic.graphv2 import RetryPolicy
# ✅ Good - retry network issues
network_retry = RetryPolicy(
max_attempts=3,
retry_on=(ConnectionError, TimeoutError)
)
# ❌ Bad - retrying auth failures won't help
bad_retry = RetryPolicy(
max_attempts=3,
retry_on=Exception # Too broad!
)
# ✅ Better - custom logic
def smart_retry(e: Exception) -> bool:
# Don't retry authentication or validation errors
if isinstance(e, (PermissionError, ValueError)):
return False
# Retry network-related issues
if isinstance(e, (ConnectionError, TimeoutError)):
return True
# Check for HTTP 5xx errors
if hasattr(e, 'response') and hasattr(e.response, 'status_code'):
return 500 <= e.response.status_code < 600
return False
smart_retry_policy = RetryPolicy(
max_attempts=3,
retry_on=smart_retry
)
2. Set Appropriate TTLs
Match cache TTL to data volatility:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, CachePolicy, InMemoryCache
class MarketState(TypedDict):
symbol: str
price: float
forecast: str
def get_stock_price(state: MarketState) -> dict:
"""Get current stock price - changes frequently."""
return {"price": 150.25}
def get_market_forecast(state: MarketState) -> dict:
"""Get market forecast - changes slowly."""
return {"forecast": "Bullish outlook for Q4"}
builder = StateGraph(MarketState)
# ✅ Good - short TTL for live data
builder.add_node(
"get_price",
get_stock_price,
cache_policy=CachePolicy(ttl=60) # 1 minute for live prices
)
# ✅ Good - longer TTL for slow-changing data
builder.add_node(
"get_forecast",
get_market_forecast,
cache_policy=CachePolicy(ttl=3600) # 1 hour for forecasts
)
builder.add_edge(START, "get_price")
builder.add_edge("get_price", "get_forecast")
builder.add_edge("get_forecast", END)
cache = InMemoryCache()
graph = builder.compile(cache=cache)
3. Choose Right Durability
Match durability mode to workflow criticality:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver
class WorkflowState(TypedDict):
data: str
result: str
def process_node(state: WorkflowState) -> dict:
return {"result": f"Processed: {state['data']}"}
builder = StateGraph(WorkflowState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
checkpointer = MemorySaver()
# Critical financial operation - use sync
# Every checkpoint is guaranteed to be saved before continuing
graph_critical = builder.compile(
checkpointer=checkpointer,
durability="sync"
)
# Normal workflow - use async (default)
# Checkpoints saved in background, good balance
graph_normal = builder.compile(
checkpointer=checkpointer,
durability="async"
)
# High-throughput, non-critical - use exit
# Only save at the end, maximum speed
graph_fast = builder.compile(
checkpointer=checkpointer,
durability="exit"
)
4. Make Operations Idempotent
Ensure retried operations don’t cause side effects:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy
class OrderState(TypedDict):
order_id: str
processed: bool
result: str
# ❌ Bad - not idempotent
def bad_process_order(state: OrderState) -> dict:
# This might charge the customer multiple times on retry!
charge_customer(state["order_id"])
return {"result": "charged", "processed": True}
# ✅ Good - idempotent with check
def good_process_order(state: OrderState) -> dict:
# Check if already processed
if is_already_processed(state["order_id"]):
return {"result": "already processed", "processed": True}
# Use idempotency key
charge_with_idempotency_key(state["order_id"])
return {"result": "charged", "processed": True}
def is_already_processed(order_id: str) -> bool:
# Check database for existing charge
return False
def charge_with_idempotency_key(order_id: str):
# Payment API with idempotency key prevents duplicate charges
pass
builder = StateGraph(OrderState)
builder.add_node(
"process",
good_process_order,
retry_policy=RetryPolicy(max_attempts=3)
)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()
Next Steps
- Advanced Features - Explore Send API and tasks
- Persistence - Master checkpointing
- Quick Start - Build your first graph

