Skip to main content

Overview

StateGraph provides comprehensive reliability features:
  • πŸ”„ Retry Policies - Automatic retry with exponential backoff
  • πŸ’Ύ Cache Policies - Avoid re-executing expensive operations
  • πŸ›‘οΈ Durability Modes - Control when state is persisted
  • πŸ” Failure Recovery - Resume from the last successful checkpoint

Retry Policies

Retry policies handle transient failures automatically.

Basic Retry

from upsonic.graphv2 import StateGraph, RetryPolicy

def unstable_node(state: MyState) -> dict:
    """A node that might fail."""
    # Simulate API call
    import random
    if random.random() < 0.3:
        raise ConnectionError("Simulated failure")
    return {"data": "success"}

builder = StateGraph(MyState)

# Add node with retry policy
builder.add_node(
    "fetch_data",
    unstable_node,
    retry_policy=RetryPolicy(
        max_attempts=3,
        initial_interval=1.0,
        backoff_factor=2.0,
        max_interval=30.0,
        jitter=True
    )
)

Retry Configuration

ParameterDefaultDescription
max_attempts3Maximum number of attempts
initial_interval0.5Seconds to wait before first retry
backoff_factor2.0Multiplier for wait time
max_interval128.0Maximum seconds between retries
jitterTrueAdd random variation to intervals
retry_onExceptionWhich exceptions trigger retry

Selective Retry

# Retry only on connection errors
retry_policy = RetryPolicy(
    max_attempts=3,
    retry_on=ConnectionError
)

# Retry on multiple exception types
retry_policy = RetryPolicy(
    max_attempts=3,
    retry_on=(ConnectionError, TimeoutError)
)

Cache Policies

Cache policies avoid re-executing expensive operations.

Basic Caching

from upsonic.graphv2 import CachePolicy, InMemoryCache

def expensive_node(state: State) -> dict:
    """Expensive computation - results are cached."""
    result = complex_calculation(state["input"])
    return {"output": result}

builder = StateGraph(State)

# Add node with cache policy
builder.add_node(
    "compute",
    expensive_node,
    cache_policy=CachePolicy(ttl=300)  # Cache for 5 minutes
)

# Compile with cache
cache = InMemoryCache()
graph = builder.compile(cache=cache)

# First call - executes and caches
result1 = graph.invoke({"input": "test"})

# Second call with same input - uses cache
result2 = graph.invoke({"input": "test"})  # Instant!
Cache keys are automatically generated from the node’s input state. Same input = cache hit.

Cache Configuration

# Cache forever (no TTL)
cache_policy = CachePolicy(ttl=None)

# Cache for 1 hour
cache_policy = CachePolicy(ttl=3600)

Combined Retry and Cache

Use both for maximum reliability and performance:
builder.add_node(
    "fetch",
    robust_node,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0),
    cache_policy=CachePolicy(ttl=600)
)
Order of Operations: Retry happens first, then successful results are cached.

Failure Recovery

Resume execution from the last successful checkpoint:
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "job-123"}}

# First attempt - fails at some point
try:
    result = graph.invoke(initial_state, config=config)
except Exception as e:
    print(f"Failed: {e}")

# Fix the issue and resume
result = graph.invoke(updated_state, config=config)

Best Practices

1. Retry Transient Failures Only

# βœ… Good - retry network issues
retry_policy = RetryPolicy(
    retry_on=(ConnectionError, TimeoutError)
)

2. Set Appropriate TTLs

# βœ… Good - TTL matches data volatility
builder.add_node(
    "get_stock_price",
    get_price,
    cache_policy=CachePolicy(ttl=60)  # 1 min for live data
)

3. Choose Right Durability

# Critical operation - sync
builder.compile(checkpointer=cp, durability="sync")

# Normal workflow - async (default)
builder.compile(checkpointer=cp, durability="async")

Next Steps