Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.upsonic.ai/llms.txt

Use this file to discover all available pages before exploring further.

Overview

What is Durable Execution

Durable Execution provides fault-tolerant execution for AI agents by automatically saving execution state at each pipeline step. This enables agents to recover from failures, resume from interruptions, and maintain consistency across restarts. The DurableExecution system manages:
  • Automatic checkpoint creation after each pipeline step
  • State persistence across multiple storage backends
  • Execution recovery from the point of failure
  • Error tracking and debugging information
  • Execution history and analytics
  • Cleanup of old execution data

How Durable Execution Works

When an agent executes a task with durable execution enabled:
  1. Checkpoint Creation: After each successful pipeline step, the system saves a checkpoint containing:
    • Task state (description, configuration, response format)
    • Execution context (messages, agent state, step information)
    • Current step index and name
    • Execution status (running, paused, failed, completed)
  2. Failure Handling: When a step fails:
    • The system saves a checkpoint at the failed step with status=“failed”
    • Error details are preserved in the checkpoint
    • Execution metadata is updated
  3. Recovery: To resume execution:
    • Load the checkpoint from storage
    • Reconstruct the task and execution context
    • Retry the failed step (if status=“failed”) or continue from the next step
    • Complete remaining pipeline steps

Checkpoint Strategy

The system uses an overwrite strategy - each execution has ONE checkpoint that is continuously updated:
Step 0 ✅ → Checkpoint: {step: 0, status: "running"}    OVERWRITE
Step 1 ✅ → Checkpoint: {step: 1, status: "running"}    OVERWRITE
Step 2 ✅ → Checkpoint: {step: 2, status: "running"}    OVERWRITE
Step 3 ❌ → Checkpoint: {step: 3, status: "failed"}     OVERWRITE
Status Values:
  • running: Individual step in progress or completed successfully (intermediate state)
  • failed: Step failed with error
  • paused: Execution paused (e.g., human-in-the-loop)
  • completed: All steps finished successfully (final state)
Result: ONE file/record per execution_id with the latest state.

Attributes

AttributeTypeDescription
storageDurableExecutionStorageStorage backend for checkpoint persistence
execution_idstrUnique identifier for this execution (auto-generated)
auto_cleanupboolAutomatically cleanup on completion (default: True)
debugboolEnable debug logging (default: False)

Creating Durable Executions

Basic Usage

Durable executions are created by attaching a DurableExecution instance to a Task. The system automatically handles checkpoint management throughout the execution lifecycle.
from upsonic import Task, Agent
from upsonic.durable import DurableExecution, FileDurableStorage

# Create storage backend
storage = FileDurableStorage(path="./checkpoints")

# Create durable execution
durable = DurableExecution(storage=storage)

# Create task with durable execution
task = Task(
    "Process customer data and generate report",
    durable_execution=durable
)

# Get the unique execution ID
print(f"Execution ID: {task.durable_execution_id}")
# Output: Execution ID: 20251027123456-a1b2c3d4

# Execute the task
agent = Agent("anthropic/claude-sonnet-4-6", name="DataProcessor")
try:
    result = agent.do(task)
    print(f"Success: {result}")
except Exception as e:
    print(f"Failed: {e}")
    print(f"Checkpoint saved at: {task.durable_execution_id}")

Recovery from Failure

# Resume from a failed execution
execution_id = "20251027123456-a1b2c3d4"

# Create a new agent instance (simulating restart)
agent = Agent("anthropic/claude-sonnet-4-6", name="DataProcessor")

# Resume from the checkpoint
result = agent.continue_durable(execution_id, storage)
print(f"Recovered and completed: {result}")

Recovery and Continuation

Basic Recovery

from upsonic import Agent
from upsonic.durable import FileDurableStorage

storage = FileDurableStorage(path="./checkpoints")

# Resume execution
execution_id = "20251027123456-a1b2c3d4"
agent = Agent("anthropic/claude-sonnet-4-6", name="Worker")

result = agent.continue_durable(execution_id, storage)
print(f"Resumed: {result}")

Recovery with Debug Mode

# Enable debug mode to see recovery details
agent = Agent("anthropic/claude-sonnet-4-6", name="Worker", debug=True)

# This will show:
# - Checkpoint information
# - Failed step details
# - Steps to be executed
# - Recovery progress
result = agent.continue_durable(execution_id, storage, debug=True)

Handling Different Failure Scenarios

# Check execution status before recovery
durable = DurableExecution(storage=storage, execution_id=execution_id)
exec_info = durable.get_execution_info()

if exec_info:
    status = exec_info.get('status')
    error = exec_info.get('error')
    
    if status == "failed":
        print(f"Execution failed at step {exec_info['step_index']}")
        print(f"Error: {error}")
        print(f"Retrying step: {exec_info['step_name']}")
        
        # Retry the failed execution
        agent = Agent("anthropic/claude-sonnet-4-6")
        result = agent.continue_durable(execution_id, storage)
        
    elif status == "paused":
        print("Execution is paused, resuming...")
        agent = Agent("anthropic/claude-sonnet-4-6")
        result = agent.continue_durable(execution_id, storage)
        
    elif status == "completed":
        print("Execution already completed!")
else:
    print(f"No execution found: {execution_id}")

Execution Management

Listing Executions

from upsonic.durable import DurableExecution

# List all executions
all_executions = DurableExecution.list_all_executions(storage)
for exec_data in all_executions:
    print(f"ID: {exec_data['execution_id']}")
    print(f"Status: {exec_data['status']}")
    print(f"Step: {exec_data['step_name']}")
    print(f"Timestamp: {exec_data['timestamp']}")
    print("---")

# List by status
failed_executions = DurableExecution.list_all_executions(
    storage, 
    status="failed"
)

completed_executions = DurableExecution.list_all_executions(
    storage,
    status="completed"
)

# List executions in progress (last step was successful)
in_progress_executions = DurableExecution.list_all_executions(
    storage,
    status="running"
)

Getting Execution Information

# Get detailed execution information
durable = DurableExecution(storage=storage, execution_id=execution_id)
exec_info = durable.get_execution_info()

if exec_info:
    print(f"Status: {exec_info['status']}")
    print(f"Step Index: {exec_info['step_index']}")
    print(f"Step Name: {exec_info['step_name']}")
    print(f"Timestamp: {exec_info['timestamp']}")
    
    if exec_info['error']:
        print(f"Error: {exec_info['error']}")

Storage Statistics

# Get storage statistics
stats = storage.get_stats()

print(f"Backend: {stats['backend']}")
print(f"Total executions: {stats['total_executions']}")
print(f"By status: {stats['by_status']}")

# Example output:
# Backend: file
# Total executions: 15
# By status: {'running': 2, 'failed': 3, 'completed': 10, 'paused': 0}

Cleanup Operations

import asyncio

# Cleanup old executions (older than 7 days)
deleted_count = asyncio.run(
    storage.cleanup_old_executions_async(older_than_days=7)
)
print(f"Deleted {deleted_count} old executions")

# Delete specific execution
success = asyncio.run(
    storage.delete_state_async(execution_id)
)
if success:
    print(f"Deleted execution: {execution_id}")

Storage

In-Memory

Fast, non-persistent storage for testing and development. Usage:
from upsonic.durable import InMemoryDurableStorage

storage = InMemoryDurableStorage()
durable = DurableExecution(storage=storage)

# Use with task
task = Task("Quick test task", durable_execution=durable)
Parameters: No configuration parameters required. Characteristics:
  • ⚡ Fastest performance
  • ❌ No persistence (lost on restart)
  • ✅ Perfect for testing
  • ✅ No setup required

File-Based

Simple, human-readable JSON files for single-node applications. Usage:
from upsonic.durable import FileDurableStorage

# Create storage with custom path
storage = FileDurableStorage(path="./execution_states")

durable = DurableExecution(
    storage=storage,
    auto_cleanup=False,  # Keep files after completion
    debug=True
)

task = Task("Data processing", durable_execution=durable)
Parameters:
ParameterTypeDescriptionDefault
pathstrDirectory path for checkpoint files”./durable_states”
Characteristics:
  • 📁 Human-readable JSON format
  • ✅ Easy debugging and inspection
  • ✅ Simple backup and restore
  • ⚠️ Not suitable for distributed systems

SQLite

Queryable, efficient storage for small to medium applications. Usage:
from upsonic.durable import SQLiteDurableStorage

# Create storage with custom database
storage = SQLiteDurableStorage(db_path="./executions.db")

durable = DurableExecution(storage=storage)
task = Task("Analytics task", durable_execution=durable)
Parameters:
ParameterTypeDescriptionDefault
db_pathstrSQLite database file path”./durable_executions.db”
Characteristics:
  • 🗄️ Single-file database
  • ✅ ACID transactions
  • ✅ Queryable execution history
  • ✅ Efficient for thousands of executions

Redis

Distributed, high-performance storage for production systems. Usage:
from upsonic.durable import RedisDurableStorage

# Create Redis storage
storage = RedisDurableStorage(
    host="localhost",
    port=6379,
    db=0,
    password="your_password",  # Optional
    prefix="upsonic:durable:"  # Key prefix
)

durable = DurableExecution(storage=storage)
task = Task("Distributed task", durable_execution=durable)
Parameters:
ParameterTypeDescriptionDefault
hoststrRedis server host”localhost”
portintRedis server port6379
dbintRedis database number0
passwordOptional[str]Redis passwordNone
prefixstrKey prefix”durable_exec:”
Characteristics:
  • ⚡ Very fast in-memory performance
  • ✅ Distributed architecture support
  • ✅ Built-in TTL for automatic cleanup
  • ✅ Scalable to millions of executions
Docker Setup:
# Start Redis with Docker
docker run -d --name upsonic-redis -p 6379:6379 redis:latest

Examples

Basic Example

About Example Scenario

This example demonstrates a complete workflow with durable execution, including task creation, execution with automatic checkpointing, failure handling, and recovery.

Durable Execution Configuration

from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, FileDurableStorage

# Setup storage
storage = FileDurableStorage(path="./example_checkpoints")

# Create durable execution
durable = DurableExecution(
    storage=storage,
    auto_cleanup=False,  # Keep records for inspection
    debug=True           # Enable debug logging
)

Full Code

import asyncio
from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, FileDurableStorage

# Define tools for the task
@tool
def verify_tool(data_source: str) -> bool:
    """Verify data source is accessible."""
    print(f"Verifying {data_source}...")
    return True

@tool
def process_tool(record_count: int) -> dict:
    """Process specified number of records."""
    print(f"Processing {record_count} records...")
    return {"processed": record_count, "status": "success"}

@tool
def report_tool(data: dict) -> str:
    """Generate final report."""
    print(f"Generating report from {data['processed']} records...")
    return f"Report generated for {data['processed']} records"


async def main():
    print("="*60)
    print("DURABLE EXECUTION EXAMPLE")
    print("="*60)
    
    # Setup storage
    storage = FileDurableStorage(
        path="./example_checkpoints"
    )
    
    # Create durable execution
    durable = DurableExecution(
        storage=storage,
        auto_cleanup=False,
        debug=True
    )
    
    # Create task with tools
    task = Task(
        "Complete data processing workflow: "
        "1. Verify data source 'database.csv' is accessible "
        "2. Process 1000 records "
        "3. Generate final report",
        tools=[verify_tool, process_tool, report_tool],
        durable_execution=durable
    )
    
    execution_id = task.durable_execution_id
    print(f"\n🆔 Execution ID: {execution_id}")
    print(f"📁 Checkpoint storage: example_checkpoints/")
    
    # Create agent
    agent = Agent("anthropic/claude-sonnet-4-6", name="DataProcessor", debug=True)
    
    # Attempt execution
    print("\n" + "="*60)
    print("PHASE 1: INITIAL EXECUTION")
    print("="*60)
    
    try:
        result = agent.do(task)
        print(f"\n✅ Execution completed successfully!")
        print(f"Result: {result}")
        
    except Exception as e:
        print(f"\n❌ Execution failed: {e}")
        
        # Get execution details
        print("\n" + "="*60)
        print("CHECKPOINT INSPECTION")
        print("="*60)
        
        exec_info = durable.get_execution_info()
        
        if exec_info:
            print(f"Status: {exec_info['status']}")
            print(f"Failed at step: {exec_info['step_index']} ({exec_info['step_name']})")
            print(f"Error: {exec_info.get('error', 'N/A')}")
            print(f"Timestamp: {exec_info['timestamp']}")
        
        # Attempt recovery
        print("\n" + "="*60)
        print("PHASE 2: RECOVERY")
        print("="*60)
        
        print("\n🔄 Attempting recovery...")
        
        try:
            # Create new agent (simulating system restart)
            recovery_agent = Agent("anthropic/claude-sonnet-4-6", name="RecoveryAgent", debug=True)
            
            # Resume from checkpoint
            result = recovery_agent.continue_durable(execution_id, storage)
            
            print(f"\n✅ Recovery successful!")
            print(f"Result: {result}")
            
        except Exception as recovery_error:
            print(f"\n❌ Recovery failed: {recovery_error}")
            raise
    
    # Display analytics
    print("\n" + "="*60)
    print("EXECUTION ANALYTICS")
    print("="*60)
    
    stats = storage.get_stats()
    print(f"Backend: {stats['backend']}")
    print(f"Total executions: {stats['total_executions']}")
    print(f"By status: {stats['by_status']}")
    
    # List all executions
    all_executions = DurableExecution.list_all_executions(storage)
    print(f"\nAll executions in storage: {len(all_executions)}")
    
    for exec_data in all_executions[:5]:
        print(f"  • {exec_data['execution_id'][:30]}...")
        print(f"    Status: {exec_data['status']}")
        print(f"    Step: {exec_data['step_name']}")
    
    # Cleanup
    print("\n" + "="*60)
    print("CLEANUP")
    print("="*60)
    
    deleted = await storage.cleanup_old_executions_async(older_than_days=30)
    print(f"Cleaned up {deleted} old executions")
    
    print("\n✅ Example completed!")

if __name__ == "__main__":
    asyncio.run(main())
This example demonstrates:
  • ✅ Creating durable execution with file storage
  • ✅ Task execution with automatic checkpointing
  • ✅ Failure detection and checkpoint inspection
  • ✅ Recovery from the point of failure
  • ✅ Execution analytics and statistics
  • ✅ Cleanup operations