Documentation Index
Fetch the complete documentation index at: https://docs.upsonic.ai/llms.txt
Use this file to discover all available pages before exploring further.
Overview
What is Durable Execution
Durable Execution provides fault-tolerant execution for AI agents by automatically saving execution state at each pipeline step. This enables agents to recover from failures, resume from interruptions, and maintain consistency across restarts.
The DurableExecution system manages:
- Automatic checkpoint creation after each pipeline step
- State persistence across multiple storage backends
- Execution recovery from the point of failure
- Error tracking and debugging information
- Execution history and analytics
- Cleanup of old execution data
How Durable Execution Works
When an agent executes a task with durable execution enabled:
-
Checkpoint Creation: After each successful pipeline step, the system saves a checkpoint containing:
- Task state (description, configuration, response format)
- Execution context (messages, agent state, step information)
- Current step index and name
- Execution status (running, paused, failed, completed)
-
Failure Handling: When a step fails:
- The system saves a checkpoint at the failed step with status=“failed”
- Error details are preserved in the checkpoint
- Execution metadata is updated
-
Recovery: To resume execution:
- Load the checkpoint from storage
- Reconstruct the task and execution context
- Retry the failed step (if status=“failed”) or continue from the next step
- Complete remaining pipeline steps
Checkpoint Strategy
The system uses an overwrite strategy - each execution has ONE checkpoint that is continuously updated:
Step 0 ✅ → Checkpoint: {step: 0, status: "running"} OVERWRITE
Step 1 ✅ → Checkpoint: {step: 1, status: "running"} OVERWRITE
Step 2 ✅ → Checkpoint: {step: 2, status: "running"} OVERWRITE
Step 3 ❌ → Checkpoint: {step: 3, status: "failed"} OVERWRITE
Status Values:
running: Individual step in progress or completed successfully (intermediate state)
failed: Step failed with error
paused: Execution paused (e.g., human-in-the-loop)
completed: All steps finished successfully (final state)
Result: ONE file/record per execution_id with the latest state.
Attributes
| Attribute | Type | Description |
|---|
| storage | DurableExecutionStorage | Storage backend for checkpoint persistence |
| execution_id | str | Unique identifier for this execution (auto-generated) |
| auto_cleanup | bool | Automatically cleanup on completion (default: True) |
| debug | bool | Enable debug logging (default: False) |
Creating Durable Executions
Basic Usage
Durable executions are created by attaching a DurableExecution instance to a Task. The system automatically handles checkpoint management throughout the execution lifecycle.
from upsonic import Task, Agent
from upsonic.durable import DurableExecution, FileDurableStorage
# Create storage backend
storage = FileDurableStorage(path="./checkpoints")
# Create durable execution
durable = DurableExecution(storage=storage)
# Create task with durable execution
task = Task(
"Process customer data and generate report",
durable_execution=durable
)
# Get the unique execution ID
print(f"Execution ID: {task.durable_execution_id}")
# Output: Execution ID: 20251027123456-a1b2c3d4
# Execute the task
agent = Agent("anthropic/claude-sonnet-4-6", name="DataProcessor")
try:
result = agent.do(task)
print(f"Success: {result}")
except Exception as e:
print(f"Failed: {e}")
print(f"Checkpoint saved at: {task.durable_execution_id}")
Recovery from Failure
# Resume from a failed execution
execution_id = "20251027123456-a1b2c3d4"
# Create a new agent instance (simulating restart)
agent = Agent("anthropic/claude-sonnet-4-6", name="DataProcessor")
# Resume from the checkpoint
result = agent.continue_durable(execution_id, storage)
print(f"Recovered and completed: {result}")
Recovery and Continuation
Basic Recovery
from upsonic import Agent
from upsonic.durable import FileDurableStorage
storage = FileDurableStorage(path="./checkpoints")
# Resume execution
execution_id = "20251027123456-a1b2c3d4"
agent = Agent("anthropic/claude-sonnet-4-6", name="Worker")
result = agent.continue_durable(execution_id, storage)
print(f"Resumed: {result}")
Recovery with Debug Mode
# Enable debug mode to see recovery details
agent = Agent("anthropic/claude-sonnet-4-6", name="Worker", debug=True)
# This will show:
# - Checkpoint information
# - Failed step details
# - Steps to be executed
# - Recovery progress
result = agent.continue_durable(execution_id, storage, debug=True)
Handling Different Failure Scenarios
# Check execution status before recovery
durable = DurableExecution(storage=storage, execution_id=execution_id)
exec_info = durable.get_execution_info()
if exec_info:
status = exec_info.get('status')
error = exec_info.get('error')
if status == "failed":
print(f"Execution failed at step {exec_info['step_index']}")
print(f"Error: {error}")
print(f"Retrying step: {exec_info['step_name']}")
# Retry the failed execution
agent = Agent("anthropic/claude-sonnet-4-6")
result = agent.continue_durable(execution_id, storage)
elif status == "paused":
print("Execution is paused, resuming...")
agent = Agent("anthropic/claude-sonnet-4-6")
result = agent.continue_durable(execution_id, storage)
elif status == "completed":
print("Execution already completed!")
else:
print(f"No execution found: {execution_id}")
Execution Management
Listing Executions
from upsonic.durable import DurableExecution
# List all executions
all_executions = DurableExecution.list_all_executions(storage)
for exec_data in all_executions:
print(f"ID: {exec_data['execution_id']}")
print(f"Status: {exec_data['status']}")
print(f"Step: {exec_data['step_name']}")
print(f"Timestamp: {exec_data['timestamp']}")
print("---")
# List by status
failed_executions = DurableExecution.list_all_executions(
storage,
status="failed"
)
completed_executions = DurableExecution.list_all_executions(
storage,
status="completed"
)
# List executions in progress (last step was successful)
in_progress_executions = DurableExecution.list_all_executions(
storage,
status="running"
)
# Get detailed execution information
durable = DurableExecution(storage=storage, execution_id=execution_id)
exec_info = durable.get_execution_info()
if exec_info:
print(f"Status: {exec_info['status']}")
print(f"Step Index: {exec_info['step_index']}")
print(f"Step Name: {exec_info['step_name']}")
print(f"Timestamp: {exec_info['timestamp']}")
if exec_info['error']:
print(f"Error: {exec_info['error']}")
Storage Statistics
# Get storage statistics
stats = storage.get_stats()
print(f"Backend: {stats['backend']}")
print(f"Total executions: {stats['total_executions']}")
print(f"By status: {stats['by_status']}")
# Example output:
# Backend: file
# Total executions: 15
# By status: {'running': 2, 'failed': 3, 'completed': 10, 'paused': 0}
Cleanup Operations
import asyncio
# Cleanup old executions (older than 7 days)
deleted_count = asyncio.run(
storage.cleanup_old_executions_async(older_than_days=7)
)
print(f"Deleted {deleted_count} old executions")
# Delete specific execution
success = asyncio.run(
storage.delete_state_async(execution_id)
)
if success:
print(f"Deleted execution: {execution_id}")
Storage
In-Memory
Fast, non-persistent storage for testing and development.
Usage:
from upsonic.durable import InMemoryDurableStorage
storage = InMemoryDurableStorage()
durable = DurableExecution(storage=storage)
# Use with task
task = Task("Quick test task", durable_execution=durable)
Parameters:
No configuration parameters required.
Characteristics:
- ⚡ Fastest performance
- ❌ No persistence (lost on restart)
- ✅ Perfect for testing
- ✅ No setup required
File-Based
Simple, human-readable JSON files for single-node applications.
Usage:
from upsonic.durable import FileDurableStorage
# Create storage with custom path
storage = FileDurableStorage(path="./execution_states")
durable = DurableExecution(
storage=storage,
auto_cleanup=False, # Keep files after completion
debug=True
)
task = Task("Data processing", durable_execution=durable)
Parameters:
| Parameter | Type | Description | Default |
|---|
| path | str | Directory path for checkpoint files | ”./durable_states” |
Characteristics:
- 📁 Human-readable JSON format
- ✅ Easy debugging and inspection
- ✅ Simple backup and restore
- ⚠️ Not suitable for distributed systems
SQLite
Queryable, efficient storage for small to medium applications.
Usage:
from upsonic.durable import SQLiteDurableStorage
# Create storage with custom database
storage = SQLiteDurableStorage(db_path="./executions.db")
durable = DurableExecution(storage=storage)
task = Task("Analytics task", durable_execution=durable)
Parameters:
| Parameter | Type | Description | Default |
|---|
| db_path | str | SQLite database file path | ”./durable_executions.db” |
Characteristics:
- 🗄️ Single-file database
- ✅ ACID transactions
- ✅ Queryable execution history
- ✅ Efficient for thousands of executions
Redis
Distributed, high-performance storage for production systems.
Usage:
from upsonic.durable import RedisDurableStorage
# Create Redis storage
storage = RedisDurableStorage(
host="localhost",
port=6379,
db=0,
password="your_password", # Optional
prefix="upsonic:durable:" # Key prefix
)
durable = DurableExecution(storage=storage)
task = Task("Distributed task", durable_execution=durable)
Parameters:
| Parameter | Type | Description | Default |
|---|
| host | str | Redis server host | ”localhost” |
| port | int | Redis server port | 6379 |
| db | int | Redis database number | 0 |
| password | Optional[str] | Redis password | None |
| prefix | str | Key prefix | ”durable_exec:” |
Characteristics:
- ⚡ Very fast in-memory performance
- ✅ Distributed architecture support
- ✅ Built-in TTL for automatic cleanup
- ✅ Scalable to millions of executions
Docker Setup:
# Start Redis with Docker
docker run -d --name upsonic-redis -p 6379:6379 redis:latest
Examples
Basic Example
About Example Scenario
This example demonstrates a complete workflow with durable execution, including task creation, execution with automatic checkpointing, failure handling, and recovery.
Durable Execution Configuration
from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, FileDurableStorage
# Setup storage
storage = FileDurableStorage(path="./example_checkpoints")
# Create durable execution
durable = DurableExecution(
storage=storage,
auto_cleanup=False, # Keep records for inspection
debug=True # Enable debug logging
)
Full Code
import asyncio
from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, FileDurableStorage
# Define tools for the task
@tool
def verify_tool(data_source: str) -> bool:
"""Verify data source is accessible."""
print(f"Verifying {data_source}...")
return True
@tool
def process_tool(record_count: int) -> dict:
"""Process specified number of records."""
print(f"Processing {record_count} records...")
return {"processed": record_count, "status": "success"}
@tool
def report_tool(data: dict) -> str:
"""Generate final report."""
print(f"Generating report from {data['processed']} records...")
return f"Report generated for {data['processed']} records"
async def main():
print("="*60)
print("DURABLE EXECUTION EXAMPLE")
print("="*60)
# Setup storage
storage = FileDurableStorage(
path="./example_checkpoints"
)
# Create durable execution
durable = DurableExecution(
storage=storage,
auto_cleanup=False,
debug=True
)
# Create task with tools
task = Task(
"Complete data processing workflow: "
"1. Verify data source 'database.csv' is accessible "
"2. Process 1000 records "
"3. Generate final report",
tools=[verify_tool, process_tool, report_tool],
durable_execution=durable
)
execution_id = task.durable_execution_id
print(f"\n🆔 Execution ID: {execution_id}")
print(f"📁 Checkpoint storage: example_checkpoints/")
# Create agent
agent = Agent("anthropic/claude-sonnet-4-6", name="DataProcessor", debug=True)
# Attempt execution
print("\n" + "="*60)
print("PHASE 1: INITIAL EXECUTION")
print("="*60)
try:
result = agent.do(task)
print(f"\n✅ Execution completed successfully!")
print(f"Result: {result}")
except Exception as e:
print(f"\n❌ Execution failed: {e}")
# Get execution details
print("\n" + "="*60)
print("CHECKPOINT INSPECTION")
print("="*60)
exec_info = durable.get_execution_info()
if exec_info:
print(f"Status: {exec_info['status']}")
print(f"Failed at step: {exec_info['step_index']} ({exec_info['step_name']})")
print(f"Error: {exec_info.get('error', 'N/A')}")
print(f"Timestamp: {exec_info['timestamp']}")
# Attempt recovery
print("\n" + "="*60)
print("PHASE 2: RECOVERY")
print("="*60)
print("\n🔄 Attempting recovery...")
try:
# Create new agent (simulating system restart)
recovery_agent = Agent("anthropic/claude-sonnet-4-6", name="RecoveryAgent", debug=True)
# Resume from checkpoint
result = recovery_agent.continue_durable(execution_id, storage)
print(f"\n✅ Recovery successful!")
print(f"Result: {result}")
except Exception as recovery_error:
print(f"\n❌ Recovery failed: {recovery_error}")
raise
# Display analytics
print("\n" + "="*60)
print("EXECUTION ANALYTICS")
print("="*60)
stats = storage.get_stats()
print(f"Backend: {stats['backend']}")
print(f"Total executions: {stats['total_executions']}")
print(f"By status: {stats['by_status']}")
# List all executions
all_executions = DurableExecution.list_all_executions(storage)
print(f"\nAll executions in storage: {len(all_executions)}")
for exec_data in all_executions[:5]:
print(f" • {exec_data['execution_id'][:30]}...")
print(f" Status: {exec_data['status']}")
print(f" Step: {exec_data['step_name']}")
# Cleanup
print("\n" + "="*60)
print("CLEANUP")
print("="*60)
deleted = await storage.cleanup_old_executions_async(older_than_days=30)
print(f"Cleaned up {deleted} old executions")
print("\n✅ Example completed!")
if __name__ == "__main__":
asyncio.run(main())
This example demonstrates:
- ✅ Creating durable execution with file storage
- ✅ Task execution with automatic checkpointing
- ✅ Failure detection and checkpoint inspection
- ✅ Recovery from the point of failure
- ✅ Execution analytics and statistics
- ✅ Cleanup operations