Skip to main content

About Example Scenario

This example demonstrates a complete workflow with durable execution, including task creation, execution with automatic checkpointing, failure handling, and recovery.

Durable Execution Configuration

from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, FileDurableStorage

# Setup storage
storage = FileDurableStorage(path="./example_checkpoints")

# Create durable execution
durable = DurableExecution(
    storage=storage,
    auto_cleanup=False,  # Keep records for inspection
    debug=True           # Enable debug logging
)

Full Code

import asyncio
from upsonic import Task, Agent, tool
from upsonic.durable import DurableExecution, FileDurableStorage

# Define tools for the task
@tool
def verify_tool(data_source: str) -> bool:
    """Verify data source is accessible."""
    print(f"Verifying {data_source}...")
    return True

@tool
def process_tool(record_count: int) -> dict:
    """Process specified number of records."""
    print(f"Processing {record_count} records...")
    return {"processed": record_count, "status": "success"}

@tool
def report_tool(data: dict) -> str:
    """Generate final report."""
    print(f"Generating report from {data['processed']} records...")
    return f"Report generated for {data['processed']} records"


async def main():
    print("="*60)
    print("DURABLE EXECUTION EXAMPLE")
    print("="*60)

    # Setup storage
    storage = FileDurableStorage(
        path="./example_checkpoints"
    )

    # Create durable execution
    durable = DurableExecution(
        storage=storage,
        auto_cleanup=False,
        debug=True
    )

    # Create task with tools
    task = Task(
        "Complete data processing workflow: "
        "1. Verify data source 'database.csv' is accessible "
        "2. Process 1000 records "
        "3. Generate final report",
        tools=[verify_tool, process_tool, report_tool],
        durable_execution=durable
    )

    execution_id = task.durable_execution_id
    print(f"\n🆔 Execution ID: {execution_id}")
    print(f"📁 Checkpoint storage: example_checkpoints/")

    # Create agent
    agent = Agent("openai/gpt-4o-mini", name="DataProcessor", debug=True)

    # Attempt execution
    print("\n" + "="*60)
    print("PHASE 1: INITIAL EXECUTION")
    print("="*60)

    try:
        result = agent.do(task)
        print(f"\n✅ Execution completed successfully!")
        print(f"Result: {result}")

    except Exception as e:
        print(f"\n❌ Execution failed: {e}")

        # Get execution details
        print("\n" + "="*60)
        print("CHECKPOINT INSPECTION")
        print("="*60)

        exec_info = durable.get_execution_info()

        if exec_info:
            print(f"Status: {exec_info['status']}")
            print(f"Failed at step: {exec_info['step_index']} ({exec_info['step_name']})")
            print(f"Error: {exec_info.get('error', 'N/A')}")
            print(f"Timestamp: {exec_info['timestamp']}")

        # Attempt recovery
        print("\n" + "="*60)
        print("PHASE 2: RECOVERY")
        print("="*60)

        print("\n🔄 Attempting recovery...")

        try:
            # Create new agent (simulating system restart)
            recovery_agent = Agent("openai/gpt-4o-mini", name="RecoveryAgent", debug=True)

            # Resume from checkpoint
            result = recovery_agent.continue_durable(execution_id, storage)

            print(f"\n✅ Recovery successful!")
            print(f"Result: {result}")

        except Exception as recovery_error:
            print(f"\n❌ Recovery failed: {recovery_error}")
            raise

    # Display analytics
    print("\n" + "="*60)
    print("EXECUTION ANALYTICS")
    print("="*60)

    stats = storage.get_stats()
    print(f"Backend: {stats['backend']}")
    print(f"Total executions: {stats['total_executions']}")
    print(f"By status: {stats['by_status']}")

    # List all executions
    all_executions = DurableExecution.list_all_executions(storage)
    print(f"\nAll executions in storage: {len(all_executions)}")

    for exec_data in all_executions[:5]:
        print(f"  • {exec_data['execution_id'][:30]}...")
        print(f"    Status: {exec_data['status']}")
        print(f"    Step: {exec_data['step_name']}")

    # Cleanup
    print("\n" + "="*60)
    print("CLEANUP")
    print("="*60)

    deleted = await storage.cleanup_old_executions_async(older_than_days=30)
    print(f"Cleaned up {deleted} old executions")

    print("\n✅ Example completed!")

if __name__ == "__main__":
    asyncio.run(main())
This example demonstrates:
  • ✅ Creating durable execution with file storage
  • ✅ Task execution with automatic checkpointing
  • ✅ Failure detection and checkpoint inspection
  • ✅ Recovery from the point of failure
  • ✅ Execution analytics and statistics
  • ✅ Cleanup operations