Skip to main content

Overview

StateGraph provides comprehensive reliability features:
  • 🔄 Retry Policies - Automatic retry with exponential backoff
  • 💾 Cache Policies - Avoid re-executing expensive operations
  • 🛡️ Durability Modes - Control when state is persisted
  • 🔁 Failure Recovery - Resume from the last successful checkpoint

Retry Policies

Retry policies handle transient failures automatically.

Basic Retry

import random
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy

class FetchState(TypedDict):
    url: str
    data: str
    success: bool

def unstable_fetch(state: FetchState) -> dict:
    """Simulate an unstable API call that might fail."""
    # Simulate 30% failure rate
    if random.random() < 0.3:
        raise ConnectionError("Simulated network failure")
    
    return {"data": f"Data from {state['url']}", "success": True}

# Build the graph
builder = StateGraph(FetchState)

# Add node with retry policy
builder.add_node(
    "fetch_data",
    unstable_fetch,
    retry_policy=RetryPolicy(
        max_attempts=3,
        initial_interval=1.0,
        backoff_factor=2.0,
        max_interval=30.0,
        jitter=True
    )
)

builder.add_edge(START, "fetch_data")
builder.add_edge("fetch_data", END)

graph = builder.compile()

# Execute - will retry up to 3 times on failure
result = graph.invoke({"url": "https://api.example.com", "data": "", "success": False})
print(f"Success: {result['success']}, Data: {result['data']}")

Retry Configuration

ParameterDefaultDescription
max_attempts3Maximum number of attempts
initial_interval0.5Seconds to wait before first retry
backoff_factor2.0Multiplier for wait time
max_interval128.0Maximum seconds between retries
jitterTrueAdd random variation to intervals
retry_onExceptionWhich exceptions trigger retry

Selective Retry

Only retry specific exception types:
import random
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy

class ApiState(TypedDict):
    endpoint: str
    response: str

# Track attempt count for demonstration
attempt_counter = {"count": 0}

def call_api(state: ApiState) -> dict:
    """Make an API call that might fail on first attempts."""
    attempt_counter["count"] += 1
    
    # Fail first 2 attempts, succeed on 3rd
    if attempt_counter["count"] < 3:
        raise ConnectionError(f"Network unreachable (attempt {attempt_counter['count']})")
    
    return {"response": f"Success on attempt {attempt_counter['count']}!"}

builder = StateGraph(ApiState)

# Retry only on ConnectionError (up to 3 attempts)
builder.add_node(
    "api_call",
    call_api,
    retry_policy=RetryPolicy(
        max_attempts=3,
        initial_interval=0.1,  # Short interval for demo
        retry_on=ConnectionError
    )
)

builder.add_edge(START, "api_call")
builder.add_edge("api_call", END)

graph = builder.compile()

# Execute - will retry on ConnectionError and succeed on 3rd attempt
try:
    result = graph.invoke({"endpoint": "https://api.example.com", "response": ""})
    print(f"Result: {result['response']}")  # Success on attempt 3!
except Exception as e:
    print(f"Failed after retries: {e}")
Retry on multiple exception types:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy

class MultiRetryState(TypedDict):
    data: str
    result: str

retry_count = {"value": 0}

def flaky_operation(state: MultiRetryState) -> dict:
    """Operation that fails with different errors."""
    retry_count["value"] += 1
    
    if retry_count["value"] == 1:
        raise ConnectionError("Network issue")
    elif retry_count["value"] == 2:
        raise TimeoutError("Request timed out")
    
    return {"result": f"Completed after {retry_count['value']} attempts"}

builder = StateGraph(MultiRetryState)
builder.add_node(
    "operation",
    flaky_operation,
    retry_policy=RetryPolicy(
        max_attempts=5,
        initial_interval=0.1,
        retry_on=(ConnectionError, TimeoutError)  # Both trigger retry
    )
)
builder.add_edge(START, "operation")
builder.add_edge("operation", END)

graph = builder.compile()

# Execute - retries on both ConnectionError and TimeoutError
result = graph.invoke({"data": "test", "result": ""})
print(f"Result: {result['result']}")  # Completed after 3 attempts

Custom Retry Logic

Use a function to determine whether to retry:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy

class SmartRetryState(TypedDict):
    request_id: str
    result: str

def should_retry_exception(exception: Exception) -> bool:
    """Custom logic to determine if we should retry."""
    # Retry on connection issues
    if isinstance(exception, (ConnectionError, TimeoutError)):
        return True
    
    # Check for HTTP status codes if available
    if hasattr(exception, 'response'):
        status_code = getattr(exception.response, 'status_code', 0)
        # Retry on 5xx server errors, but not 4xx client errors
        if 500 <= status_code < 600:
            return True
        if 400 <= status_code < 500:
            return False
    
    # Default: don't retry unknown errors
    return False

# Track attempts for demonstration
smart_attempts = {"count": 0}

def make_request(state: SmartRetryState) -> dict:
    """Make a request that might fail."""
    smart_attempts["count"] += 1
    
    # Simulate transient failure on first attempt
    if smart_attempts["count"] == 1:
        raise ConnectionError("Temporary network issue")
    
    return {"result": f"Success! Request {state['request_id']} completed."}

builder = StateGraph(SmartRetryState)
builder.add_node(
    "request",
    make_request,
    retry_policy=RetryPolicy(
        max_attempts=5,
        initial_interval=0.1,
        retry_on=should_retry_exception  # Custom function
    )
)
builder.add_edge(START, "request")
builder.add_edge("request", END)

graph = builder.compile()

# Execute - custom retry logic determines what to retry
result = graph.invoke({"request_id": "req-123", "result": ""})
print(f"Result: {result['result']}")  # Success! Request req-123 completed.
print(f"Total attempts: {smart_attempts['count']}")  # 2

Cache Policies

Cache policies avoid re-executing expensive operations.

Basic Caching

import time
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, CachePolicy, InMemoryCache

class ComputeState(TypedDict):
    input: str
    output: str

def complex_calculation(input_value: str) -> str:
    """Simulate an expensive computation."""
    time.sleep(0.1)  # Simulate processing time
    return f"processed_{input_value}"

def expensive_node(state: ComputeState) -> dict:
    """Expensive computation - results are cached."""
    result = complex_calculation(state["input"])
    return {"output": result}

# Build the graph
builder = StateGraph(ComputeState)

# Add node with cache policy (5 minute TTL)
builder.add_node(
    "compute",
    expensive_node,
    cache_policy=CachePolicy(ttl=300)
)

builder.add_edge(START, "compute")
builder.add_edge("compute", END)

# Compile with cache
cache = InMemoryCache()
graph = builder.compile(cache=cache)

# First call - executes and caches
print("First call (should be slow)...")
start = time.time()
result1 = graph.invoke({"input": "test", "output": ""})
print(f"Took {time.time() - start:.3f}s - Result: {result1['output']}")

# Second call with same input - uses cache
print("\nSecond call (should be instant)...")
start = time.time()
result2 = graph.invoke({"input": "test", "output": ""})
print(f"Took {time.time() - start:.3f}s - Result: {result2['output']}")

# Third call with different input - executes again
print("\nThird call with different input (should be slow)...")
start = time.time()
result3 = graph.invoke({"input": "different", "output": ""})
print(f"Took {time.time() - start:.3f}s - Result: {result3['output']}")

# Verify cache hit for original input
print("\nFourth call with original input (should be instant)...")
start = time.time()
result4 = graph.invoke({"input": "test", "output": ""})
print(f"Took {time.time() - start:.3f}s - Result: {result4['output']}")
Cache keys are automatically generated from the node’s input state. Same input = cache hit.

Cache Configuration

from upsonic.graphv2 import CachePolicy

# Cache forever (no TTL)
cache_forever = CachePolicy(ttl=None)

# Cache for 1 hour
cache_1h = CachePolicy(ttl=3600)

# Cache for 5 minutes
cache_5m = CachePolicy(ttl=300)

# Cache for 60 seconds (good for frequently changing data)
cache_1m = CachePolicy(ttl=60)

SQLite Cache for Persistence

Use SQLite cache to persist cached results across restarts:
import sqlite3
import time
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, CachePolicy, SqliteCache

class PersistentCacheState(TypedDict):
    query: str
    result: str

# Track execution for demonstration
query_execution_count = {"value": 0}

def database_query(state: PersistentCacheState) -> dict:
    """Simulate an expensive database query."""
    query_execution_count["value"] += 1
    print(f"  [Executing query #{query_execution_count['value']}...]")
    time.sleep(0.3)  # Simulate query time
    return {"result": f"Results for: {state['query']} (execution #{query_execution_count['value']})"}

builder = StateGraph(PersistentCacheState)
builder.add_node(
    "query",
    database_query,
    cache_policy=CachePolicy(ttl=3600)  # Cache for 1 hour
)
builder.add_edge(START, "query")
builder.add_edge("query", END)

# Use SQLite cache with check_same_thread=False for async compatibility
conn = sqlite3.connect("query_cache.db", check_same_thread=False)
cache = SqliteCache(conn)
graph = builder.compile(cache=cache)

# First call - executes the query (slow)
print("First call (executes query):")
start = time.time()
result1 = graph.invoke({"query": "SELECT * FROM users", "result": ""})
print(f"  Took {time.time() - start:.3f}s")
print(f"  Result: {result1['result']}")

# Second call with same input - cache hit (instant)
print("\nSecond call (cache hit):")
start = time.time()
result2 = graph.invoke({"query": "SELECT * FROM users", "result": ""})
print(f"  Took {time.time() - start:.3f}s")
print(f"  Result: {result2['result']}")

# Different query - cache miss (slow again)
print("\nThird call with different query (cache miss):")
start = time.time()
result3 = graph.invoke({"query": "SELECT * FROM orders", "result": ""})
print(f"  Took {time.time() - start:.3f}s")
print(f"  Result: {result3['result']}")

# Verify total executions
print(f"\nTotal query executions: {query_execution_count['value']}")  # 2 (not 3!)

# Cache persists across program restarts since it's stored in SQLite
Important: Always use check_same_thread=False when creating SQLite connections for StateGraph. This is required because the graph uses asyncio internally.

Combined Retry and Cache

Use both for maximum reliability and performance:
import random
import time
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy, CachePolicy, InMemoryCache

class RobustState(TypedDict):
    api_key: str
    data: str
    success: bool

def robust_api_call(state: RobustState) -> dict:
    """API call with retries and caching."""
    # Simulate occasional failures
    if random.random() < 0.2:
        raise ConnectionError("Temporary network issue")
    
    # Simulate slow response
    time.sleep(0.2)
    
    return {
        "data": f"Data fetched with key: {state['api_key']}",
        "success": True
    }

builder = StateGraph(RobustState)

# Add node with BOTH retry and cache policies
builder.add_node(
    "fetch",
    robust_api_call,
    retry_policy=RetryPolicy(
        max_attempts=3,
        initial_interval=1.0,
        backoff_factor=2.0
    ),
    cache_policy=CachePolicy(ttl=600)  # Cache for 10 minutes
)

builder.add_edge(START, "fetch")
builder.add_edge("fetch", END)

cache = InMemoryCache()
graph = builder.compile(cache=cache)

# Execute - retries on failure, caches successful results
config = {"configurable": {"thread_id": "robust-api"}}
result = graph.invoke(
    {"api_key": "my-key", "data": "", "success": False}
)
print(f"Success: {result['success']}, Data: {result['data']}")
Order of Operations: Retry happens first, then successful results are cached.

Failure Recovery

Resume execution from the last successful checkpoint:
from typing import Annotated
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class RecoveryState(TypedDict):
    step: int
    completed_steps: Annotated[list, operator.add]
    should_fail: bool

# Track failures for demo
failure_count = {"step2": 0}

def step1_node(state: RecoveryState) -> dict:
    """First step - always succeeds."""
    return {"step": 1, "completed_steps": ["step1"]}

def step2_node(state: RecoveryState) -> dict:
    """Second step - fails on first attempt, succeeds on retry."""
    failure_count["step2"] += 1
    
    if state["should_fail"] and failure_count["step2"] == 1:
        raise RuntimeError("Simulated failure in step 2")
    
    return {"step": 2, "completed_steps": ["step2"]}

def step3_node(state: RecoveryState) -> dict:
    """Third step - always succeeds."""
    return {"step": 3, "completed_steps": ["step3"]}

# Build the graph
builder = StateGraph(RecoveryState)
builder.add_node("step1", step1_node)
builder.add_node("step2", step2_node)
builder.add_node("step3", step3_node)
builder.add_edge(START, "step1")
builder.add_edge("step1", "step2")
builder.add_edge("step2", "step3")
builder.add_edge("step3", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "recovery-demo"}}

# First attempt - will fail at step2
print("First attempt...")
try:
    result = graph.invoke(
        {"step": 0, "completed_steps": [], "should_fail": True},
        config=config
    )
except RuntimeError as e:
    print(f"Failed: {e}")
    
    # Check what was completed
    state = graph.get_state(config)
    if state:
        print(f"Completed steps before failure: {state.values.get('completed_steps', [])}")

# Resume from failure - pass None to continue from checkpoint
print("\nResuming from checkpoint...")
result = graph.invoke(
    {"should_fail": False},  # Fix the issue
    config=config
)
print(f"Final result: {result}")
print(f"All completed steps: {result['completed_steps']}")

Best Practices

1. Retry Transient Failures Only

Don’t retry permanent failures:
from upsonic.graphv2 import RetryPolicy

# ✅ Good - retry network issues
network_retry = RetryPolicy(
    max_attempts=3,
    retry_on=(ConnectionError, TimeoutError)
)

# ❌ Bad - retrying auth failures won't help
bad_retry = RetryPolicy(
    max_attempts=3,
    retry_on=Exception  # Too broad!
)

# ✅ Better - custom logic
def smart_retry(e: Exception) -> bool:
    # Don't retry authentication or validation errors
    if isinstance(e, (PermissionError, ValueError)):
        return False
    # Retry network-related issues
    if isinstance(e, (ConnectionError, TimeoutError)):
        return True
    # Check for HTTP 5xx errors
    if hasattr(e, 'response') and hasattr(e.response, 'status_code'):
        return 500 <= e.response.status_code < 600
    return False

smart_retry_policy = RetryPolicy(
    max_attempts=3,
    retry_on=smart_retry
)

2. Set Appropriate TTLs

Match cache TTL to data volatility:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, CachePolicy, InMemoryCache

class MarketState(TypedDict):
    symbol: str
    price: float
    forecast: str

def get_stock_price(state: MarketState) -> dict:
    """Get current stock price - changes frequently."""
    return {"price": 150.25}

def get_market_forecast(state: MarketState) -> dict:
    """Get market forecast - changes slowly."""
    return {"forecast": "Bullish outlook for Q4"}

builder = StateGraph(MarketState)

# ✅ Good - short TTL for live data
builder.add_node(
    "get_price",
    get_stock_price,
    cache_policy=CachePolicy(ttl=60)  # 1 minute for live prices
)

# ✅ Good - longer TTL for slow-changing data
builder.add_node(
    "get_forecast",
    get_market_forecast,
    cache_policy=CachePolicy(ttl=3600)  # 1 hour for forecasts
)

builder.add_edge(START, "get_price")
builder.add_edge("get_price", "get_forecast")
builder.add_edge("get_forecast", END)

cache = InMemoryCache()
graph = builder.compile(cache=cache)

3. Choose Right Durability

Match durability mode to workflow criticality:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver

class WorkflowState(TypedDict):
    data: str
    result: str

def process_node(state: WorkflowState) -> dict:
    return {"result": f"Processed: {state['data']}"}

builder = StateGraph(WorkflowState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)

checkpointer = MemorySaver()

# Critical financial operation - use sync
# Every checkpoint is guaranteed to be saved before continuing
graph_critical = builder.compile(
    checkpointer=checkpointer,
    durability="sync"
)

# Normal workflow - use async (default)
# Checkpoints saved in background, good balance
graph_normal = builder.compile(
    checkpointer=checkpointer,
    durability="async"
)

# High-throughput, non-critical - use exit
# Only save at the end, maximum speed
graph_fast = builder.compile(
    checkpointer=checkpointer,
    durability="exit"
)

4. Make Operations Idempotent

Ensure retried operations don’t cause side effects:
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, RetryPolicy

class OrderState(TypedDict):
    order_id: str
    processed: bool
    result: str

# ❌ Bad - not idempotent
def bad_process_order(state: OrderState) -> dict:
    # This might charge the customer multiple times on retry!
    charge_customer(state["order_id"])
    return {"result": "charged", "processed": True}

# ✅ Good - idempotent with check
def good_process_order(state: OrderState) -> dict:
    # Check if already processed
    if is_already_processed(state["order_id"]):
        return {"result": "already processed", "processed": True}
    
    # Use idempotency key
    charge_with_idempotency_key(state["order_id"])
    return {"result": "charged", "processed": True}

def is_already_processed(order_id: str) -> bool:
    # Check database for existing charge
    return False

def charge_with_idempotency_key(order_id: str):
    # Payment API with idempotency key prevents duplicate charges
    pass

builder = StateGraph(OrderState)
builder.add_node(
    "process",
    good_process_order,
    retry_policy=RetryPolicy(max_attempts=3)
)
builder.add_edge(START, "process")
builder.add_edge("process", END)

graph = builder.compile()

Next Steps