Skip to main content

Overview

In the Upsonic framework, DurableExecution provides fault-tolerant execution for AI agents by automatically saving execution state at each pipeline step. This enables agents to recover from failures, resume from interruptions, and maintain consistency across restarts. The DurableExecution system serves as a reliability layer that manages:
  • Automatic checkpoint creation after each pipeline step
  • State persistence across multiple storage backends
  • Execution recovery from the point of failure
  • Error tracking and debugging information
  • Execution history and analytics
  • Cleanup of old execution data

Key Concepts

How Durable Execution Works

When an agent executes a task with durable execution enabled:
  1. Checkpoint Creation: After each successful pipeline step, the system saves a checkpoint containing:
    • Task state (description, configuration, response format)
    • Execution context (messages, agent state, step information)
    • Current step index and name
    • Execution status (paused, failed, completed)
  2. Failure Handling: When a step fails:
    • The system saves a checkpoint at the failed step with status=“failed”
    • Error details are preserved in the checkpoint
    • Execution metadata is updated
  3. Recovery: To resume execution:
    • Load the checkpoint from storage
    • Reconstruct the task and execution context
    • Retry the failed step (if status=“failed”) or continue from the next step
    • Complete remaining pipeline steps

Checkpoint Strategy

The system uses an overwrite strategy - each execution has ONE checkpoint that is continuously updated:
Step 0 ✅ → Checkpoint: {step: 0, status: "success"}    OVERWRITE
Step 1 ✅ → Checkpoint: {step: 1, status: "success"}    OVERWRITE
Step 2 ✅ → Checkpoint: {step: 2, status: "success"}    OVERWRITE
Step 3 ❌ → Checkpoint: {step: 3, status: "failed"}     OVERWRITE
Status Values:
  • success: Individual step completed successfully (intermediate state)
  • failed: Step failed with error
  • paused: Execution paused (e.g., human-in-the-loop)
  • completed: All steps finished successfully (final state)
Result: ONE file/record per execution_id with the latest state. Complete Example Flow: Successful execution (all 19 steps):
Step 0 ✅ → {step: 0, status: "success"}     OVERWRITE
Step 1 ✅ → {step: 1, status: "success"}     OVERWRITE
...
Step 18 ✅ → {step: 18, status: "success"}   OVERWRITE
Finalize → {step: 18, status: "completed"}  OVERWRITE (final)
Failed execution (fails at step 8):
Step 0 ✅ → {step: 0, status: "success"}     OVERWRITE
Step 1 ✅ → {step: 1, status: "success"}     OVERWRITE
...
Step 7 ✅ → {step: 7, status: "success"}     OVERWRITE
Step 8 ❌ → {step: 8, status: "failed"}      OVERWRITE (final)
Recovery from failed execution:
Load checkpoint → {step: 8, status: "failed"}
Retry step 8 ✅ → {step: 8, status: "success"}   OVERWRITE
Step 9 ✅ → {step: 9, status: "success"}         OVERWRITE
...
Step 18 ✅ → {step: 18, status: "success"}       OVERWRITE
Finalize → {step: 18, status: "completed"}      OVERWRITE (final)

DurableExecution Attributes

Core Attributes

AttributeTypeDescription
execution_idstrUnique identifier for this execution (auto-generated)
storageDurableExecutionStorageStorage backend for checkpoint persistence
auto_cleanupboolAutomatically cleanup on completion (default: True)
debugboolEnable debug logging (default: False)

Storage Backend Options

BackendUse CasePersistencePerformance
InMemoryStorageTesting, temporary executionsNoFastest
FileStorageDevelopment, single-node systemsYesFast
SQLiteStorageSmall to medium applicationsYesFast
RedisStorageDistributed, high-scale systemsYesVery Fast

Creating Durable Executions

Durable executions are created by attaching a DurableExecution instance to a Task. The system automatically handles checkpoint management throughout the execution lifecycle.

Basic Usage

from upsonic import Task, Agent
from upsonic.durable import DurableExecution, FileDurableStorage

# Create storage backend
storage = FileDurableStorage(path="./checkpoints")

# Create durable execution
durable = DurableExecution(storage=storage)

# Create task with durable execution
task = Task(
    "Process customer data and generate report",
    durable_execution=durable
)

# Get the unique execution ID
print(f"Execution ID: {task.durable_execution_id}")
# Output: Execution ID: 20251027123456-a1b2c3d4

# Execute the task
agent = Agent("openai/gpt-4o-mini", name="DataProcessor")
try:
    result = agent.do(task)
    print(f"Success: {result}")
except Exception as e:
    print(f"Failed: {e}")
    print(f"Checkpoint saved at: {task.durable_execution_id}")

Recovery from Failure

# Resume from a failed execution
execution_id = "20251027123456-a1b2c3d4"

# Create a new agent instance (simulating restart)
agent = Agent("openai/gpt-4o-mini", name="DataProcessor")

# Resume from the checkpoint
result = agent.continue_durable(execution_id, storage)
print(f"Recovered and completed: {result}")

Storage Backends

In-Memory Storage

Fast, non-persistent storage for testing and development.
from upsonic.durable import InMemoryDurableStorage

storage = InMemoryDurableStorage()
durable = DurableExecution(storage=storage)

# Use with task
task = Task("Quick test task", durable_execution=durable)
Characteristics:
  • ⚡ Fastest performance
  • ❌ No persistence (lost on restart)
  • ✅ Perfect for testing
  • ✅ No setup required

File-Based Storage

Simple, human-readable JSON files for single-node applications.
from upsonic.durable import FileDurableStorage

# Create storage with custom path
storage = FileDurableStorage(path="./execution_states")

durable = DurableExecution(
    storage=storage,
    auto_cleanup=False,  # Keep files after completion
    debug=True
)

task = Task("Data processing", durable_execution=durable)
Configuration Options:
ParameterTypeDescriptionDefault
pathstrDirectory path for checkpoint files”./durable_states”
Characteristics:
  • 📁 Human-readable JSON format
  • ✅ Easy debugging and inspection
  • ✅ Simple backup and restore
  • ⚠️ Not suitable for distributed systems

SQLite Storage

Queryable, efficient storage for small to medium applications.
from upsonic.durable import SQLiteDurableStorage

# Create storage with custom database
storage = SQLiteDurableStorage(
    db_path="./executions.db",
    table_name="durable_executions"
)

durable = DurableExecution(storage=storage)
task = Task("Analytics task", durable_execution=durable)
Configuration Options:
ParameterTypeDescriptionDefault
db_pathstrSQLite database file path”./durable_executions.db”
table_namestrTable name for checkpoints”durable_executions”
Characteristics:
  • 🗄️ Single-file database
  • ✅ ACID transactions
  • ✅ Queryable execution history
  • ✅ Efficient for thousands of executions

Redis Storage

Distributed, high-performance storage for production systems.
from upsonic.durable import RedisDurableStorage

# Create Redis storage
storage = RedisDurableStorage(
    host="localhost",
    port=6379,
    db=0,
    password="your_password",  # Optional
    prefix="upsonic:durable:"  # Key prefix
)

durable = DurableExecution(storage=storage)
task = Task("Distributed task", durable_execution=durable)
Configuration Options:
ParameterTypeDescriptionDefault
hoststrRedis server host”localhost”
portintRedis server port6379
dbintRedis database number0
passwordOptional[str]Redis passwordNone
prefixstrKey prefix”durable:state:“
ttlOptional[int]Time-to-live in secondsNone (no expiration)
Characteristics:
  • ⚡ Very fast in-memory performance
  • ✅ Distributed architecture support
  • ✅ Built-in TTL for automatic cleanup
  • ✅ Scalable to millions of executions
Docker Setup:
# Start Redis with Docker
docker run -d --name upsonic-redis -p 6379:6379 redis:latest

Recovery and Continuation

Basic Recovery

from upsonic import Agent
from upsonic.durable import FileDurableStorage

storage = FileDurableStorage(path="./checkpoints")

# Resume execution
execution_id = "20251027123456-a1b2c3d4"
agent = Agent("openai/gpt-4o-mini", name="Worker")

result = agent.continue_durable(execution_id, storage)
print(f"Resumed: {result}")

Recovery with Debug Mode

# Enable debug mode to see recovery details
agent = Agent("openai/gpt-4o-mini", name="Worker", debug=True)

# This will show:
# - Checkpoint information
# - Failed step details
# - Steps to be executed
# - Recovery progress
result = agent.continue_durable(execution_id, storage, debug=True)

Handling Different Failure Scenarios

# Check execution status before recovery
exec_info = DurableExecution.get_execution_info(execution_id, storage)

if exec_info:
    status = exec_info.get('status')
    error = exec_info.get('error')
    
    if status == "failed":
        print(f"Execution failed at step {exec_info['step_index']}")
        print(f"Error: {error}")
        print(f"Retrying step: {exec_info['step_name']}")
        
        # Retry the failed execution
        agent = Agent("openai/gpt-4o-mini")
        result = agent.continue_durable(execution_id, storage)
        
    elif status == "paused":
        print("Execution is paused, resuming...")
        agent = Agent("openai/gpt-4o-mini")
        result = agent.continue_durable(execution_id, storage)
        
    elif status == "completed":
        print("Execution already completed!")
else:
    print(f"No execution found: {execution_id}")

Execution Management

Listing Executions

from upsonic.durable import DurableExecution

# List all executions
all_executions = DurableExecution.list_all_executions(storage)
for exec_data in all_executions:
    print(f"ID: {exec_data['execution_id']}")
    print(f"Status: {exec_data['status']}")
    print(f"Step: {exec_data['step_name']}")
    print(f"Timestamp: {exec_data['timestamp']}")
    print("---")

# List by status
failed_executions = DurableExecution.list_all_executions(
    storage, 
    status="failed"
)

completed_executions = DurableExecution.list_all_executions(
    storage,
    status="completed"
)

# List executions in progress (last step was successful)
in_progress_executions = DurableExecution.list_all_executions(
    storage,
    status="success"
)

Getting Execution Information

# Get detailed execution information
durable = DurableExecution(storage=storage)
durable.execution_id = execution_id

exec_info = durable.get_execution_info()

if exec_info:
    print(f"Status: {exec_info['status']}")
    print(f"Step Index: {exec_info['step_index']}")
    print(f"Step Name: {exec_info['step_name']}")
    print(f"Timestamp: {exec_info['timestamp']}")
    
    if exec_info['error']:
        print(f"Error: {exec_info['error']}")

Storage Statistics

# Get storage statistics
stats = storage.get_stats()

print(f"Backend: {stats['backend']}")
print(f"Total executions: {stats['total_executions']}")
print(f"By status: {stats['by_status']}")

# Example output:
# Backend: file
# Total executions: 15
# By status: {'success': 2, 'failed': 3, 'completed': 10, 'paused': 0}
# 
# Note: 'success' means execution is in progress (last step succeeded)

Cleanup Operations

import asyncio

# Cleanup old executions (older than 7 days)
deleted_count = asyncio.run(
    storage.cleanup_old_executions_async(older_than_days=7)
)
print(f"Deleted {deleted_count} old executions")

# Delete specific execution
success = asyncio.run(
    storage.delete_state_async(execution_id)
)
if success:
    print(f"Deleted execution: {execution_id}")

Practical Examples

Banking Transaction with Recovery

from upsonic import Task, Agent
from upsonic.durable import DurableExecution, SQLiteDurableStorage

# Create storage for banking transactions
banking_storage = SQLiteDurableStorage(
    db_path="./banking_transactions.db",
    table_name="transactions"
)

# Define transaction task
def process_transaction(amount: float, from_account: str, to_account: str):
    # Create durable execution for critical transaction
    durable = DurableExecution(
        storage=banking_storage,
        auto_cleanup=False,  # Keep records for audit
        debug=True
    )
    
    task = Task(
        f"Transfer ${amount} from {from_account} to {to_account}. "
        "Verify balances, initiate transfer, and confirm completion.",
        durable_execution=durable
    )
    
    execution_id = task.durable_execution_id
    print(f"Transaction ID: {execution_id}")
    
    agent = Agent("openai/gpt-4o-mini", name="BankingAgent")
    
    try:
        # Attempt transaction
        result = agent.do(task)
        print(f"✅ Transaction completed: {result}")
        return execution_id, "success"
        
    except Exception as e:
        print(f"❌ Transaction failed: {e}")
        print(f"💾 State saved for recovery: {execution_id}")
        return execution_id, "failed"

# Execute transaction
exec_id, status = process_transaction(1000.0, "ACC001", "ACC002")

# If failed, retry later
if status == "failed":
    print("\nRetrying transaction...")
    agent = Agent("openai/gpt-4o-mini", name="BankingAgent")
    result = agent.continue_durable(exec_id, banking_storage)
    print(f"✅ Transaction recovered: {result}")
Key Benefits:
  • ✅ No duplicate charges (transaction resumes from exact point)
  • ✅ Complete audit trail of all steps
  • ✅ Automatic recovery from network failures
  • ✅ Maintains transaction consistency

Long-Running Data Processing

from upsonic import Task, Agent
from upsonic.durable import DurableExecution, RedisDurableStorage

# Create Redis storage for distributed processing
processing_storage = RedisDurableStorage(
    host="localhost",
    port=6379,
    db=0,
    prefix="data_processing:"
)

def process_large_dataset(dataset_path: str, batch_size: int = 1000):
    """Process a large dataset with automatic checkpoint recovery."""
    
    durable = DurableExecution(
        storage=processing_storage,
        auto_cleanup=False,
        debug=True
    )
    
    task = Task(
        f"Process dataset at {dataset_path} in batches of {batch_size}. "
        "Validate data, transform records, and store results.",
        durable_execution=durable
    )
    
    agent = Agent("openai/gpt-4o-mini", name="DataProcessor")
    
    print(f"Starting processing: {task.durable_execution_id}")
    print(f"Dataset: {dataset_path}")
    
    try:
        result = agent.do(task)
        print(f"✅ Processing complete: {result}")
        
    except Exception as e:
        print(f"❌ Processing interrupted: {e}")
        print(f"💾 Progress saved: {task.durable_execution_id}")
        print("\nRun the following to resume:")
        print(f"  resume_processing('{task.durable_execution_id}')")
        
    return task.durable_execution_id

def resume_processing(execution_id: str):
    """Resume processing from checkpoint."""
    
    # Check where it failed
    exec_info = DurableExecution.get_execution_info(execution_id, processing_storage)
    
    if exec_info:
        print(f"Resuming from step {exec_info['step_index']}: {exec_info['step_name']}")
        if exec_info['error']:
            print(f"Previous error: {exec_info['error']}")
    
    # Resume execution
    agent = Agent("openai/gpt-4o-mini", name="DataProcessor")
    result = agent.continue_durable(execution_id, processing_storage)
    
    print(f"✅ Processing resumed and completed: {result}")
    return result

# Usage
exec_id = process_large_dataset("/data/large_dataset.csv", batch_size=5000)

# If interrupted, resume later
# resume_processing(exec_id)
Key Benefits:
  • ✅ Process millions of records safely
  • ✅ Resume from interruption point (not restart from zero)
  • ✅ Distributed processing support via Redis
  • ✅ Progress tracking and monitoring

Multi-Step Workflow with Tools

from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, FileDurableStorage

# Define workflow tools

@tool
def fetch_data(source: str) -> dict:
    """Fetch data from external source."""
    print(f"Fetching from {source}...")
    return {"data": "sample_data", "source": source}

@tool
def validate_data(data: dict) -> bool:
    """Validate fetched data."""
    print(f"Validating data from {data.get('source')}...")
    return True

@tool
def transform_data(data: dict) -> dict:
    """Transform data to required format."""
    print(f"Transforming data...")
    return {"transformed": data}

@tool
def store_results(data: dict) -> str:
    """Store processed data."""
    print(f"Storing results...")
    return "storage_id_123"


# Setup workflow with durable execution
workflow_storage = FileDurableStorage(path="./workflows")

durable = DurableExecution(
    storage=workflow_storage,
    auto_cleanup=False,
    debug=True
)

task = Task(
    "Complete ETL workflow: "
    "1. Fetch data from 'api.example.com' "
    "2. Validate the data "
    "3. Transform to JSON format "
    "4. Store in database",
    tools=[fetch_data, validate_data, transform_data, store_results],
    durable_execution=durable
)

workflow_id = task.durable_execution_id
print(f"Workflow ID: {workflow_id}")

# Execute workflow
agent = Agent("openai/gpt-4o-mini", name="WorkflowAgent")

try:
    result = agent.do(task)
    print(f"✅ Workflow completed: {result}")
    
except Exception as e:
    print(f"❌ Workflow failed at some step: {e}")
    
    # Get execution details
    exec_info = DurableExecution.get_execution_info(workflow_id, workflow_storage)
    print(f"\nFailed at: {exec_info['step_name']}")
    print(f"Completed steps: {exec_info['step_index']}")
    
    # Resume workflow
    print("\nResuming workflow...")
    result = agent.continue_durable(workflow_id, workflow_storage)
    print(f"✅ Workflow recovered: {result}")
Key Benefits:
  • ✅ Complex multi-step workflows with tool calls
  • ✅ Each step is checkpointed
  • ✅ Tool results are preserved
  • ✅ Resume from exact tool call

Monitoring and Analytics

import asyncio
from datetime import datetime, timedelta
from upsonic.durable import DurableExecution, SQLiteDurableStorage

# Create storage for monitoring
monitoring_storage = SQLiteDurableStorage(
    db_path="./execution_analytics.db"
)

async def get_execution_analytics(storage):
    """Get comprehensive execution analytics."""
    
    # Get storage statistics
    stats = storage.get_stats()
    
    print("="*60)
    print("EXECUTION ANALYTICS")
    print("="*60)
    
    print(f"\n📊 Overall Statistics:")
    print(f"  Backend: {stats['backend']}")
    print(f"  Total Executions: {stats['total_executions']}")
    
    print(f"\n📈 By Status:")
    for status, count in stats['by_status'].items():
        percentage = (count / stats['total_executions'] * 100) if stats['total_executions'] > 0 else 0
        print(f"  {status.upper()}: {count} ({percentage:.1f}%)")
    
    # List failed executions for investigation
    failed = DurableExecution.list_all_executions(storage, status="failed")
    
    if failed:
        print(f"\n❌ Failed Executions ({len(failed)}):")
        for exec_data in failed[:5]:  # Show first 5
            exec_id = exec_data['execution_id'][:20] + "..."
            print(f"  • {exec_id}")
            print(f"    Failed at: {exec_data['step_name']}")
            if exec_data['error']:
                error_msg = exec_data['error'][:60] + "..."
                print(f"    Error: {error_msg}")
    
    # List in-progress executions (status="success" means still executing)
    in_progress = DurableExecution.list_all_executions(storage, status="success")
    
    if in_progress:
        print(f"\n🏃 Currently In Progress ({len(in_progress)}):")
        for exec_data in in_progress[:5]:
            exec_id = exec_data['execution_id'][:20] + "..."
            print(f"  • {exec_id}")
            print(f"    Last completed step: {exec_data['step_name']}")
            print(f"    Started: {exec_data['timestamp']}")
    
    # Cleanup old completed executions
    print(f"\n🧹 Cleanup:")
    deleted = await storage.cleanup_old_executions_async(older_than_days=7)
    print(f"  Deleted {deleted} executions older than 7 days")

# Run analytics
asyncio.run(get_execution_analytics(monitoring_storage))

Execution Recovery Dashboard

from upsonic import Agent
from upsonic.durable import DurableExecution, RedisDurableStorage

def create_recovery_dashboard(storage):
    """Interactive dashboard for managing failed executions."""
    
    print("="*60)
    print("DURABLE EXECUTION RECOVERY DASHBOARD")
    print("="*60)
    
    # Get all failed executions
    failed = DurableExecution.list_all_executions(storage, status="failed")
    
    if not failed:
        print("\n✅ No failed executions found!")
        return
    
    print(f"\n❌ Found {len(failed)} failed executions:\n")
    
    for idx, exec_data in enumerate(failed, 1):
        exec_id = exec_data['execution_id']
        step = exec_data['step_name']
        error = exec_data.get('error', 'Unknown error')
        timestamp = exec_data.get('timestamp', 'Unknown time')
        
        print(f"{idx}. Execution: {exec_id[:30]}...")
        print(f"   Failed at step: {step}")
        print(f"   Error: {error[:100]}...")
        print(f"   Time: {timestamp}")
        print()
    
    # Recovery menu
    print("Options:")
    print("  1. Retry a specific execution")
    print("  2. Retry all failed executions")
    print("  3. Delete failed executions")
    print("  0. Exit")
    
    choice = input("\nSelect option: ")
    
    if choice == "1":
        idx = int(input("Enter execution number: ")) - 1
        execution_id = failed[idx]['execution_id']
        retry_execution(execution_id, storage)
        
    elif choice == "2":
        retry_all_failed(failed, storage)
        
    elif choice == "3":
        delete_failed(failed, storage)

def retry_execution(execution_id: str, storage):
    """Retry a specific failed execution."""
    print(f"\n🔄 Retrying execution: {execution_id}")
    
    agent = Agent("openai/gpt-4o-mini", name="RecoveryAgent")
    
    try:
        result = agent.continue_durable(execution_id, storage)
        print(f"✅ Recovery successful!")
        print(f"Result: {result[:100]}...")
        
    except Exception as e:
        print(f"❌ Recovery failed: {e}")

def retry_all_failed(failed_list: list, storage):
    """Retry all failed executions."""
    print(f"\n🔄 Retrying {len(failed_list)} failed executions...")
    
    agent = Agent("openai/gpt-4o-mini", name="RecoveryAgent")
    success_count = 0
    
    for exec_data in failed_list:
        execution_id = exec_data['execution_id']
        try:
            agent.continue_durable(execution_id, storage)
            success_count += 1
            print(f"  ✅ {execution_id[:20]}... recovered")
        except Exception as e:
            print(f"  ❌ {execution_id[:20]}... failed: {str(e)[:50]}")
    
    print(f"\n📊 Summary: {success_count}/{len(failed_list)} executions recovered")

def delete_failed(failed_list: list, storage):
    """Delete failed executions."""
    import asyncio
    
    confirm = input(f"Delete {len(failed_list)} failed executions? (yes/no): ")
    
    if confirm.lower() == "yes":
        for exec_data in failed_list:
            execution_id = exec_data['execution_id']
            asyncio.run(storage.delete_state_async(execution_id))
        print(f"✅ Deleted {len(failed_list)} executions")

# Usage
storage = RedisDurableStorage(host="localhost", port=6379, db=0)
create_recovery_dashboard(storage)

Best Practices

Storage Selection

  1. Development: Use FileDurableStorage for easy debugging
  2. Testing: Use InMemoryDurableStorage for fast tests
  3. Production (Single Node): Use SQLiteDurableStorage for reliability
  4. Production (Distributed): Use RedisDurableStorage for scalability

Cleanup Strategy

# Enable auto-cleanup for short-lived tasks
durable = DurableExecution(
    storage=storage,
    auto_cleanup=True  # Clean up after successful completion
)

# Disable auto-cleanup for audit trails
durable = DurableExecution(
    storage=storage,
    auto_cleanup=False  # Keep all execution records
)

# Periodic cleanup of old executions
import asyncio
from datetime import timedelta

async def periodic_cleanup():
    while True:
        deleted = await storage.cleanup_old_executions_async(older_than_days=30)
        print(f"Cleaned up {deleted} old executions")
        await asyncio.sleep(86400)  # Once per day

Error Handling

# Comprehensive error handling
from upsonic import Task, Agent
from upsonic.durable import DurableExecution

def execute_with_retry(task_description: str, max_retries: int = 3):
    """Execute task with automatic retry on failure."""
    
    storage = FileDurableStorage(path="./executions")
    durable = DurableExecution(storage=storage, debug=True)
    
    task = Task(task_description, durable_execution=durable)
    execution_id = task.durable_execution_id
    
    agent = Agent("openai/gpt-4o-mini")
    
    for attempt in range(max_retries):
        try:
            if attempt == 0:
                # First attempt
                result = agent.do(task)
            else:
                # Retry from checkpoint
                print(f"Retry attempt {attempt}/{max_retries}")
                result = agent.continue_durable(execution_id, storage)
            
            print(f"✅ Success on attempt {attempt + 1}")
            return result
            
        except Exception as e:
            print(f"❌ Attempt {attempt + 1} failed: {e}")
            
            if attempt == max_retries - 1:
                print(f"💾 Execution saved for manual recovery: {execution_id}")
                raise
            
            # Wait before retry
            import time
            time.sleep(2 ** attempt)  # Exponential backoff

# Usage
result = execute_with_retry("Process critical data", max_retries=3)

Monitoring and Alerting

import asyncio
from datetime import datetime

async def monitor_executions(storage, alert_threshold: int = 5):
    """Monitor for excessive failures and alert."""
    
    while True:
        stats = storage.get_stats()
        failed_count = stats['by_status'].get('failed', 0)
        
        if failed_count >= alert_threshold:
            # Get recent failures
            failed = DurableExecution.list_all_executions(
                storage, 
                status="failed",
                limit=10
            )
            
            print(f"🚨 ALERT: {failed_count} failed executions detected!")
            print("Recent failures:")
            
            for exec_data in failed:
                print(f"  • {exec_data['execution_id']}")
                print(f"    Step: {exec_data['step_name']}")
                print(f"    Error: {exec_data.get('error', 'N/A')[:60]}")
            
            # Send alert (email, Slack, PagerDuty, etc.)
            # send_alert(f"High failure rate detected: {failed_count} failures")
        
        await asyncio.sleep(300)  # Check every 5 minutes

Performance Optimization

# Use appropriate storage for scale
def get_storage_for_scale(scale: str):
    """Select storage based on application scale."""
    
    if scale == "small":
        # < 100 executions/day
        return FileDurableStorage(path="./checkpoints")
        
    elif scale == "medium":
        # 100-10,000 executions/day
        return SQLiteDurableStorage(db_path="./executions.db")
        
    elif scale == "large":
        # > 10,000 executions/day or distributed
        return RedisDurableStorage(
            host="redis.example.com",
            port=6379,
            db=0,
            ttl=86400  # 24-hour TTL for automatic cleanup
        )

# Configure cleanup based on retention requirements
storage = SQLiteDurableStorage(db_path="./executions.db")

# High-frequency cleanup for short retention
async def aggressive_cleanup():
    await storage.cleanup_old_executions_async(older_than_days=1)

# Conservative cleanup for audit requirements
async def audit_compliant_cleanup():
    await storage.cleanup_old_executions_async(older_than_days=365)

Complete Example

import asyncio
from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, SQLiteDurableStorage

# Define tools for the task
@tool
def verify_tool(data_source: str) -> bool:
    """Verify data source is accessible."""
    print(f"Verifying {data_source}...")
    return True

@tool
def process_tool(record_count: int) -> dict:
    """Process specified number of records."""
    print(f"Processing {record_count} records...")
    return {"processed": record_count, "status": "success"}

@tool
def report_tool(data: dict) -> str:
    """Generate final report."""
    print(f"Generating report from {data['processed']} records...")
    return f"Report generated for {data['processed']} records"


async def main():
    print("="*60)
    print("COMPREHENSIVE DURABLE EXECUTION EXAMPLE")
    print("="*60)
    
    # Setup storage
    storage = SQLiteDurableStorage(
        db_path="./comprehensive_example.db",
        table_name="executions"
    )
    
    # Create durable execution
    durable = DurableExecution(
        storage=storage,
        auto_cleanup=False,
        debug=True
    )
    
    # Create task with tools
    task = Task(
        "Complete data processing workflow: "
        "1. Verify data source 'database.csv' is accessible "
        "2. Process 1000 records "
        "3. Generate final report",
        tools=[verify_tool, process_tool, report_tool],
        durable_execution=durable
    )
    
    execution_id = task.durable_execution_id
    print(f"\n🆔 Execution ID: {execution_id}")
    print(f"📁 Checkpoint storage: comprehensive_example.db")
    
    # Create agent
    agent = Agent("openai/gpt-4o-mini", name="DataProcessor", debug=True)
    
    # Attempt execution
    print("\n" + "="*60)
    print("PHASE 1: INITIAL EXECUTION")
    print("="*60)
    
    try:
        result = agent.do(task)
        print(f"\n✅ Execution completed successfully!")
        print(f"Result: {result}")
        
    except Exception as e:
        print(f"\n❌ Execution failed: {e}")
        
        # Get execution details
        print("\n" + "="*60)
        print("CHECKPOINT INSPECTION")
        print("="*60)
        
        exec_info = durable.get_execution_info()
        
        if exec_info:
            print(f"Status: {exec_info['status']}")
            print(f"Failed at step: {exec_info['step_index']} ({exec_info['step_name']})")
            print(f"Error: {exec_info.get('error', 'N/A')}")
            print(f"Timestamp: {exec_info['timestamp']}")
        
        # Attempt recovery
        print("\n" + "="*60)
        print("PHASE 2: RECOVERY")
        print("="*60)
        
        print("\n🔄 Attempting recovery...")
        
        try:
            # Create new agent (simulating system restart)
            recovery_agent = Agent("openai/gpt-4o-mini", name="RecoveryAgent", debug=True)
            
            # Resume from checkpoint
            result = recovery_agent.continue_durable(execution_id, storage)
            
            print(f"\n✅ Recovery successful!")
            print(f"Result: {result}")
            
        except Exception as recovery_error:
            print(f"\n❌ Recovery failed: {recovery_error}")
            raise
    
    # Display analytics
    print("\n" + "="*60)
    print("EXECUTION ANALYTICS")
    print("="*60)
    
    stats = storage.get_stats()
    print(f"Backend: {stats['backend']}")
    print(f"Total executions: {stats['total_executions']}")
    print(f"By status: {stats['by_status']}")
    
    # List all executions
    all_executions = DurableExecution.list_all_executions(storage)
    print(f"\nAll executions in storage: {len(all_executions)}")
    
    for exec_data in all_executions[:5]:
        print(f"  • {exec_data['execution_id'][:30]}...")
        print(f"    Status: {exec_data['status']}")
        print(f"    Step: {exec_data['step_name']}")
    
    # Cleanup
    print("\n" + "="*60)
    print("CLEANUP")
    print("="*60)
    
    deleted = await storage.cleanup_old_executions_async(older_than_days=30)
    print(f"Cleaned up {deleted} old executions")
    
    print("\n✅ Example completed!")

if __name__ == "__main__":
    asyncio.run(main())

Integration Patterns

With Agent Pipelines

# Durable execution works seamlessly with agent pipeline
from upsonic import Agent, Task
from upsonic.durable import DurableExecution, FileDurableStorage

storage = FileDurableStorage(path="./pipeline_checkpoints")

# Each pipeline step is automatically checkpointed
durable = DurableExecution(storage=storage, debug=True)

task = Task(
    "Multi-step analysis with validation, processing, and reporting",
    durable_execution=durable
)

agent = Agent("openai/gpt-4o-mini", name="PipelineAgent")
result = agent.do(task)

# Checkpoints created at each step:
# - initialization
# - storage_connection  
# - cache_check
# - user_policy
# - llm_manager
# - model_selection
# - validation
# - tool_setup
# - message_build
# - model_execution
# - response_processing
# - ... (19 steps total)

Summary

Durable Execution in Upsonic provides enterprise-grade reliability for AI agents: Automatic Checkpointing: Every step is saved automatically
Multiple Storage Backends: File, SQLite, Redis, In-Memory
Seamless Recovery: Resume from exact failure point
No Data Loss: Complete state preservation
Production Ready: Tested across all storage backends
Distributed Support: Redis-based coordination
Easy Integration: Simple API, minimal code changes
Perfect for mission-critical applications like banking transactions, long-running data processing, multi-step workflows, and distributed agent systems.