import asyncio
from upsonic import Agent, Task
from upsonic.tools import tool
from upsonic.db.database import SqliteDatabase
@tool(requires_confirmation=True)
def sensitive_operation(data: str) -> str:
"""Perform a sensitive operation that requires user confirmation."""
return f"Sensitive operation completed on: {data}"
async def confirmation_cross_process_run_id():
db = SqliteDatabase(db_file="confirmation.db", session_id="session_1", user_id="user_1")
agent = Agent("anthropic/claude-sonnet-4-6", name="confirmation_agent", db=db)
task = Task(
description="Perform a sensitive operation on the data 'audit_logs'.",
tools=[sensitive_operation]
)
output = await agent.do_async(task, return_output=True)
run_id = output.run_id
if output.is_paused and output.active_requirements:
for req in output.active_requirements:
if req.tool_execution:
print(f" - Tool: {req.tool_execution.tool_name}")
print(f" Args: {req.tool_execution.tool_args}")
for req in output.active_requirements:
if req.needs_confirmation:
req.confirm()
new_db = SqliteDatabase(db_file="confirmation.db", session_id="session_1", user_id="user_1")
new_agent = Agent("anthropic/claude-sonnet-4-6", name="confirmation_agent", db=new_db)
result = await new_agent.continue_run_async(
run_id=run_id,
requirements=output.requirements,
return_output=True
)
return result
asyncio.run(confirmation_cross_process_run_id())