About Example Scenario
This example demonstrates a complete workflow with durable execution, including task creation, execution with automatic checkpointing, failure handling, and recovery.Durable Execution Configuration
Copy
from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, FileDurableStorage
# Setup storage
storage = FileDurableStorage(path="./example_checkpoints")
# Create durable execution
durable = DurableExecution(
storage=storage,
auto_cleanup=False, # Keep records for inspection
debug=True # Enable debug logging
)
Full Code
Copy
import asyncio
from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, FileDurableStorage
# Define tools for the task
@tool
def verify_tool(data_source: str) -> bool:
"""Verify data source is accessible."""
print(f"Verifying {data_source}...")
return True
@tool
def process_tool(record_count: int) -> dict:
"""Process specified number of records."""
print(f"Processing {record_count} records...")
return {"processed": record_count, "status": "success"}
@tool
def report_tool(data: dict) -> str:
"""Generate final report."""
print(f"Generating report from {data['processed']} records...")
return f"Report generated for {data['processed']} records"
async def main():
print("="*60)
print("DURABLE EXECUTION EXAMPLE")
print("="*60)
# Setup storage
storage = FileDurableStorage(
path="./example_checkpoints"
)
# Create durable execution
durable = DurableExecution(
storage=storage,
auto_cleanup=False,
debug=True
)
# Create task with tools
task = Task(
"Complete data processing workflow: "
"1. Verify data source 'database.csv' is accessible "
"2. Process 1000 records "
"3. Generate final report",
tools=[verify_tool, process_tool, report_tool],
durable_execution=durable
)
execution_id = task.durable_execution_id
print(f"\n🆔 Execution ID: {execution_id}")
print(f"📁 Checkpoint storage: example_checkpoints/")
# Create agent
agent = Agent("openai/gpt-4o-mini", name="DataProcessor", debug=True)
# Attempt execution
print("\n" + "="*60)
print("PHASE 1: INITIAL EXECUTION")
print("="*60)
try:
result = agent.do(task)
print(f"\n✅ Execution completed successfully!")
print(f"Result: {result}")
except Exception as e:
print(f"\n❌ Execution failed: {e}")
# Get execution details
print("\n" + "="*60)
print("CHECKPOINT INSPECTION")
print("="*60)
exec_info = durable.get_execution_info()
if exec_info:
print(f"Status: {exec_info['status']}")
print(f"Failed at step: {exec_info['step_index']} ({exec_info['step_name']})")
print(f"Error: {exec_info.get('error', 'N/A')}")
print(f"Timestamp: {exec_info['timestamp']}")
# Attempt recovery
print("\n" + "="*60)
print("PHASE 2: RECOVERY")
print("="*60)
print("\n🔄 Attempting recovery...")
try:
# Create new agent (simulating system restart)
recovery_agent = Agent("openai/gpt-4o-mini", name="RecoveryAgent", debug=True)
# Resume from checkpoint
result = recovery_agent.continue_durable(execution_id, storage)
print(f"\n✅ Recovery successful!")
print(f"Result: {result}")
except Exception as recovery_error:
print(f"\n❌ Recovery failed: {recovery_error}")
raise
# Display analytics
print("\n" + "="*60)
print("EXECUTION ANALYTICS")
print("="*60)
stats = storage.get_stats()
print(f"Backend: {stats['backend']}")
print(f"Total executions: {stats['total_executions']}")
print(f"By status: {stats['by_status']}")
# List all executions
all_executions = DurableExecution.list_all_executions(storage)
print(f"\nAll executions in storage: {len(all_executions)}")
for exec_data in all_executions[:5]:
print(f" • {exec_data['execution_id'][:30]}...")
print(f" Status: {exec_data['status']}")
print(f" Step: {exec_data['step_name']}")
# Cleanup
print("\n" + "="*60)
print("CLEANUP")
print("="*60)
deleted = await storage.cleanup_old_executions_async(older_than_days=30)
print(f"Cleaned up {deleted} old executions")
print("\n✅ Example completed!")
if __name__ == "__main__":
asyncio.run(main())
- ✅ Creating durable execution with file storage
- ✅ Task execution with automatic checkpointing
- ✅ Failure detection and checkpoint inspection
- ✅ Recovery from the point of failure
- ✅ Execution analytics and statistics
- ✅ Cleanup operations

