Skip to main content
The Cancel Run feature allows you to cancel a running agent execution and later resume it from exactly where it left off. This is useful for implementing timeouts, user-initiated cancellations, or graceful shutdown handling.

Quick Start

import asyncio
import time
from upsonic import Agent, Task
from upsonic.tools import tool
from upsonic.db.database import SqliteDatabase
from upsonic.run.base import RunStatus
from upsonic.run.cancel import cancel_run

@tool
def long_running_task(seconds: int) -> str:
    """A task that takes time to complete."""
    time.sleep(seconds)
    return f"Completed after {seconds} seconds"

async def main():
    db = SqliteDatabase(db_file="my_app.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db)
    task = Task(
        description="Call long_running_task with 10 seconds.",
        tools=[long_running_task]
    )
    
    # Cancel after 2 seconds
    async def cancel_after_delay():
        await asyncio.sleep(2)
        if agent.run_id:
            cancel_run(agent.run_id)
            print(f"Cancelled run: {agent.run_id}")
    
    asyncio.create_task(cancel_after_delay())
    
    output = await agent.do_async(task, return_output=True)
    
    if output.status == RunStatus.cancelled:
        print("Run was cancelled, resuming...")
        result = await agent.continue_run_async(run_id=output.run_id, return_output=True)
        print(f"Result: {result.output}")
        return result
    
    return output

asyncio.run(main())

Core API

cancel_run(run_id: str)

Cancels a running agent execution by its run ID. The agent will stop at the next checkpoint and save its current state.
from upsonic.run.cancel import cancel_run

# Cancel using the run_id
cancel_run(agent.run_id)

agent.run_id

During execution, access the current run’s ID via agent.run_id. This is available as soon as the run starts.

output.is_cancelled

Check if a run was cancelled by inspecting the output:
from upsonic import Agent, Task

agent = Agent("openai/gpt-4o-mini")
task = Task("Your task description")

output = await agent.do_async(task, return_output=True)

if output.is_cancelled:
    # Handle cancelled state
    pass

output.status

Access the full status enum:
from upsonic.run.base import RunStatus

if output.status == RunStatus.cancelled:
    # Cancelled
    pass

Continuation Methods

Resume with run_id (Same Agent)

The simplest approach - use the same agent instance with run_id:
import asyncio
import time
from upsonic import Agent, Task
from upsonic.tools import tool
from upsonic.db.database import SqliteDatabase
from upsonic.run.base import RunStatus
from upsonic.run.cancel import cancel_run

@tool
def long_running_task(seconds: int) -> str:
    """A task that takes time to complete."""
    time.sleep(seconds)
    return f"Completed after {seconds} seconds"

async def cancel_and_resume_same_agent():
    db = SqliteDatabase(db_file="cancel.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db)
    task = Task(
        description="Call long_running_task with 10 seconds.",
        tools=[long_running_task]
    )
    
    async def cancel_after_delay():
        await asyncio.sleep(2)
        if agent.run_id:
            cancel_run(agent.run_id)
    
    asyncio.create_task(cancel_after_delay())
    
    output = await agent.do_async(task, return_output=True)
    
    if output.status == RunStatus.cancelled:
        result = await agent.continue_run_async(run_id=output.run_id, return_output=True)
        return result
    
    return output

asyncio.run(cancel_and_resume_same_agent())

Resume with task (Same Agent)

Use the task object for in-memory context continuation:
import asyncio
import time
from upsonic import Agent, Task
from upsonic.tools import tool
from upsonic.db.database import SqliteDatabase
from upsonic.run.base import RunStatus
from upsonic.run.cancel import cancel_run

@tool
def long_running_task(seconds: int) -> str:
    """A task that takes time to complete."""
    time.sleep(seconds)
    return f"Completed after {seconds} seconds"

async def cancel_and_resume_with_task():
    db = SqliteDatabase(db_file="cancel.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db)
    task = Task(
        description="Call long_running_task with 10 seconds.",
        tools=[long_running_task]
    )
    
    async def cancel_after_delay():
        await asyncio.sleep(2)
        if agent.run_id:
            cancel_run(agent.run_id)
    
    asyncio.create_task(cancel_after_delay())
    
    output = await agent.do_async(task, return_output=True)
    
    if output.status == RunStatus.cancelled:
        # Use task for continuation (in-memory context)
        result = await agent.continue_run_async(task=task, return_output=True)
        return result
    
    return output

asyncio.run(cancel_and_resume_with_task())

Resume with run_id (New Agent - Cross-Process)

For cross-process resumption where a new agent instance loads the cancelled run from storage:
import asyncio
import time
from upsonic import Agent, Task
from upsonic.tools import tool
from upsonic.db.database import SqliteDatabase
from upsonic.run.base import RunStatus
from upsonic.run.cancel import cancel_run

@tool
def long_running_task(seconds: int) -> str:
    """A task that takes time to complete."""
    time.sleep(seconds)
    return f"Completed after {seconds} seconds"

async def cancel_and_resume_new_agent():
    db = SqliteDatabase(db_file="cancel.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db)
    task = Task(
        description="Call long_running_task with 10 seconds.",
        tools=[long_running_task]
    )
    
    async def cancel_after_delay():
        await asyncio.sleep(2)
        if agent.run_id:
            cancel_run(agent.run_id)
    
    asyncio.create_task(cancel_after_delay())
    
    output = await agent.do_async(task, return_output=True)
    run_id = output.run_id
    
    if output.status == RunStatus.cancelled:
        # Create NEW agent instance to resume
        new_db = SqliteDatabase(db_file="cancel.db", session_id="session_1", user_id="user_1")
        new_agent = Agent("openai/gpt-4o-mini", db=new_db)
        result = await new_agent.continue_run_async(run_id=run_id, return_output=True)
        return result
    
    return output

asyncio.run(cancel_and_resume_new_agent())

Resume with task (New Agent - Cross-Process)

New agent using task object for continuation:
import asyncio
import time
from upsonic import Agent, Task
from upsonic.tools import tool
from upsonic.db.database import SqliteDatabase
from upsonic.run.base import RunStatus
from upsonic.run.cancel import cancel_run

@tool
def long_running_task(seconds: int) -> str:
    """A task that takes time to complete."""
    time.sleep(seconds)
    return f"Completed after {seconds} seconds"

async def cancel_and_resume_new_agent_with_task():
    db = SqliteDatabase(db_file="cancel.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db)
    task = Task(
        description="Call long_running_task with 10 seconds.",
        tools=[long_running_task]
    )
    
    async def cancel_after_delay():
        await asyncio.sleep(2)
        if agent.run_id:
            cancel_run(agent.run_id)
    
    asyncio.create_task(cancel_after_delay())
    
    output = await agent.do_async(task, return_output=True)
    
    if output.status == RunStatus.cancelled:
        # Create NEW agent instance with task
        new_db = SqliteDatabase(db_file="cancel.db", session_id="session_1", user_id="user_1")
        new_agent = Agent("openai/gpt-4o-mini", db=new_db)
        result = await new_agent.continue_run_async(task=task, return_output=True)
        return result
    
    return output

asyncio.run(cancel_and_resume_new_agent_with_task())

Advanced Patterns

Verify Resume from Cut-off Point

Verify that resumption continues from where execution left off, not from the beginning:
import asyncio
import time
from upsonic import Agent, Task
from upsonic.tools import tool
from upsonic.db.database import SqliteDatabase
from upsonic.run.base import RunStatus
from upsonic.run.cancel import cancel_run

@tool
def long_running_task(seconds: int) -> str:
    """A task that takes time to complete."""
    time.sleep(seconds)
    return f"Completed after {seconds} seconds"

async def verify_resume_from_cutoff():
    db = SqliteDatabase(db_file="cancel.db", session_id="session_1", user_id="user_1")
    agent = Agent("openai/gpt-4o-mini", db=db, debug=True)
    task = Task(
        description="Call long_running_task with 10 seconds.",
        tools=[long_running_task]
    )
    
    async def cancel_after_delay():
        await asyncio.sleep(2)
        if agent.run_id:
            cancel_run(agent.run_id)
    
    asyncio.create_task(cancel_after_delay())
    
    output = await agent.do_async(task, return_output=True)
    
    print(f"Steps before cancel: {len(output.step_results)}")
    for i, sr in enumerate(output.step_results):
        print(f"  [{i}] {sr.name}: {sr.status}")
    
    steps_before_resume = len(output.step_results)
    
    if output.status == RunStatus.cancelled:
        result = await agent.continue_run_async(run_id=output.run_id, return_output=True)
        
        print(f"Steps after resume: {len(result.step_results)}")
        new_steps = len(result.step_results) - steps_before_resume
        
        if new_steps > 0:
            print(f"✅ {new_steps} new steps added (continued from cut-off)")
        
        return result
    
    return output

asyncio.run(verify_resume_from_cutoff())

Important Notes

  • Direct Call Mode Only: HITL continuation only supports direct call mode. Streaming mode is not supported for continuation.
  • Persistent Storage: For cross-process resumption, always use persistent storage like SqliteDatabase.
  • Run ID Availability: The agent.run_id is available as soon as execution begins.
  • State Preservation: The agent saves its state at checkpoints, so resumption continues from the last completed step.