import asyncio
from upsonic import Agent, Task
from upsonic.tools import tool
from upsonic.tools.user_input import UserControlFlowTools
from upsonic.db.database import SqliteDatabase
@tool
def send_email(subject: str, body: str, to_address: str) -> str:
"""Send an email to the given address."""
return f"Email sent to {to_address} with subject '{subject}' and body '{body}'"
def fill_user_input(requirement) -> None:
if not requirement.user_input_schema:
return
for field_dict in requirement.user_input_schema:
if isinstance(field_dict, dict) and field_dict.get("value") is None:
name = field_dict["name"]
field_dict["value"] = "dynamic@example.com"
requirement.tool_execution.answered = True
async def dynamic_input_cross_process_run_id():
db = SqliteDatabase(db_file="dynamic_input.db", session_id="session_1", user_id="user_1")
agent = Agent("anthropic/claude-sonnet-4-6", name="dynamic_input_agent", db=db)
task = Task(
description="Send an email with the body 'What is the weather in Tokyo?'",
tools=[send_email, UserControlFlowTools()]
)
output = await agent.do_async(task, return_output=True)
run_id = output.run_id
if output.is_paused and output.active_requirements:
for req in output.active_requirements:
if req.user_input_schema:
for field_dict in req.user_input_schema:
print(f" Field: {field_dict.get('name')} (type={field_dict.get('field_type', 'str')})")
for req in output.active_requirements:
if req.needs_user_input:
fill_user_input(req)
new_db = SqliteDatabase(db_file="dynamic_input.db", session_id="session_1", user_id="user_1")
new_agent = Agent("anthropic/claude-sonnet-4-6", name="dynamic_input_agent", db=new_db)
result = await new_agent.continue_run_async(
run_id=run_id,
requirements=output.requirements,
return_output=True
)
else:
result = output
return result
asyncio.run(dynamic_input_cross_process_run_id())