Skip to main content
Durable Execution enables your agent runs to survive failures and recover automatically. When an error occurs, the agent’s state is preserved, allowing you to resume from where execution left off rather than starting from scratch.

Quick Start

import asyncio
from upsonic import Agent, Task
from upsonic.db.database import SqliteDatabase

async def main():
    db = SqliteDatabase(db_file="durable.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db, retry=1)
    task = Task("What is 7 + 7? Reply with just the number.")
    
    try:
        result = await agent.do_async(task, return_output=True)
        return result
    except Exception as e:
        print(f"Error caught: {e}")
        # Recover using run_id from agent's internal state
        agent_output = getattr(agent, '_agent_run_output', None)
        if agent_output:
            run_id = agent_output.run_id
            result = await agent.continue_run_async(run_id=run_id, return_output=True)
            return result
        raise

asyncio.run(main())

Core Concepts

Error Recovery Flow

  1. Agent starts execution and state is persisted to storage
  2. Error occurs during execution (network issue, API error, etc.)
  3. Agent’s state is saved with error status
  4. continue_run_async() loads the saved state and resumes execution
  5. Execution continues from the last successful checkpoint

Continuation Methods

Recovery with run_id (Same Agent)

Recover from errors using the run_id and same agent instance:
import asyncio
from upsonic import Agent, Task
from upsonic.db.database import SqliteDatabase

async def durable_recovery_same_agent():
    db = SqliteDatabase(db_file="durable.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db, retry=1)
    task = Task("What is 7 + 7? Reply with just the number.")
    
    try:
        result = await agent.do_async(task, return_output=True)
        return result
    except Exception as e:
        print(f"Error caught: {e}")
        agent_output = getattr(agent, '_agent_run_output', None)
        if agent_output:
            run_id = agent_output.run_id
            print(f"Recovering with run_id: {run_id}")
            result = await agent.continue_run_async(run_id=run_id, return_output=True)
            return result
        raise

asyncio.run(durable_recovery_same_agent())

Recovery with task (Same Agent)

Recover from errors using the task object:
import asyncio
from upsonic import Agent, Task
from upsonic.db.database import SqliteDatabase

async def durable_recovery_with_task():
    db = SqliteDatabase(db_file="durable.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db, retry=1)
    task = Task("What is 7 + 7? Reply with just the number.")
    
    try:
        result = await agent.do_async(task, return_output=True)
        return result
    except Exception as e:
        print(f"Error caught: {e}")
        print("Recovering with task...")
        result = await agent.continue_run_async(task=task, return_output=True)
        return result

asyncio.run(durable_recovery_with_task())

Recovery with run_id (New Agent - Cross-Process)

Recover from errors with a new agent instance, simulating cross-process resumption:
import asyncio
from upsonic import Agent, Task
from upsonic.db.database import SqliteDatabase

async def durable_recovery_new_agent():
    db = SqliteDatabase(db_file="durable.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db, retry=1)
    task = Task("What is 7 + 7? Reply with just the number.")
    
    run_id = None
    
    try:
        result = await agent.do_async(task, return_output=True)
        return result
    except Exception as e:
        print(f"Error caught: {e}")
        agent_output = getattr(agent, '_agent_run_output', None)
        if agent_output:
            run_id = agent_output.run_id
        
        if run_id:
            print(f"Creating new agent to recover with run_id: {run_id}")
            new_db = SqliteDatabase(db_file="durable.db", session_id="session_1", user_id="user_1")
            new_agent = Agent("openai/gpt-4o-mini", db=new_db, retry=1)
            result = await new_agent.continue_run_async(run_id=run_id, return_output=True)
            return result
        raise

asyncio.run(durable_recovery_new_agent())

Recovery with task (New Agent - Cross-Process)

Recover with a new agent instance using the task object:
import asyncio
from upsonic import Agent, Task
from upsonic.db.database import SqliteDatabase

async def durable_recovery_new_agent_with_task():
    db = SqliteDatabase(db_file="durable.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db, retry=1)
    task = Task("What is 7 + 7? Reply with just the number.")
    
    try:
        result = await agent.do_async(task, return_output=True)
        return result
    except Exception as e:
        print(f"Error caught: {e}")
        print("Creating new agent to recover with task...")
        new_db = SqliteDatabase(db_file="durable.db", session_id="session_1", user_id="user_1")
        new_agent = Agent("openai/gpt-4o-mini", db=new_db, retry=1)
        result = await new_agent.continue_run_async(task=task, return_output=True)
        return result

asyncio.run(durable_recovery_new_agent_with_task())

Important Notes

  • Retry Configuration: Set retry=1 to disable internal retries when implementing your own retry logic.
  • Direct Call Mode Only: HITL continuation only supports direct call mode. Streaming is not supported.
  • Persistent Storage: Always use persistent storage like SqliteDatabase for cross-process durability.
  • Error Access: Access agent._agent_run_output after an exception to get the run state.
  • Checkpoint-Based: Recovery continues from the last successful checkpoint