Documentation Index
Fetch the complete documentation index at: https://docs.upsonic.ai/llms.txt
Use this file to discover all available pages before exploring further.
Agents can be executed using different methods depending on your needs - synchronous, asynchronous, or streaming.
Synchronous Execution
The simplest way to run an agent is using the do() method, which executes synchronously and returns the result.
from upsonic import Agent, Task
# Create agent
agent = Agent("anthropic/claude-sonnet-4-5")
# Execute with Task object
task = Task("What is the capital of France?")
result = agent.print_do(task)
print(result) # Output: Paris
# Or execute directly with a string
result = agent.print_do("What is the capital of France?")
print(result) # Output: Paris
do(), do_async(), print_do(), and print_do_async() accept a list of strings or a list of Task objects. They run each item in sequence and return a list of string results. A single-element list returns a single string (scalar); an empty list returns an empty list. Mixed lists of str and Task are supported.
List of strings
from upsonic import Agent
agent = Agent("anthropic/claude-sonnet-4-5")
results = agent.do(["What is 2+2?", "What is 3+3?"])
# results is a list of strings, one per input
for r in results:
print(r)
List of Task objects
from upsonic import Agent, Task
agent = Agent("anthropic/claude-sonnet-4-5")
tasks = [
Task("What is the capital of France?"),
Task("What is the capital of Germany?"),
]
results = agent.do(tasks)
# results is a list of strings, one per task
for r in results:
print(r)
Asynchronous Execution
For concurrent operations or async applications, use do_async() which returns a coroutine.
from upsonic import Agent, Task
import asyncio
async def main():
# Create agent
agent = Agent("anthropic/claude-sonnet-4-5")
# Execute asynchronously (accepts Task or string)
result = await agent.do_async("Explain quantum computing in simple terms")
print(result)
# Run async function
asyncio.run(main())
Streaming Text Output
For real-time output, use stream() to get responses as they’re generated.
import asyncio
from upsonic import Agent, Task
async def main():
# Create agent and task
agent = Agent("anthropic/claude-sonnet-4-5")
task = Task("Write a short poem about coding")
# Stream the output
async for text_chunk in agent.astream(task):
print(text_chunk, end='', flush=True)
print() # New line after streaming
if __name__ == "__main__":
asyncio.run(main())
Event Streaming
For full visibility into agent execution, use stream_events() to receive detailed events about every step of the pipeline.
import asyncio
from upsonic import Agent, Task
from upsonic.run.events.events import (
PipelineStartEvent,
PipelineEndEvent,
TextDeltaEvent,
ToolCallEvent,
ToolResultEvent,
)
from upsonic.tools import tool
@tool
def calculate(x: int, y: int) -> int:
"""Add two numbers."""
return x + y
async def main():
agent = Agent("anthropic/claude-sonnet-4-5")
task = Task("Calculate 5 + 3", tools=[calculate])
async for event in agent.astream(task, events=True):
if isinstance(event, PipelineStartEvent):
print(f"🚀 Starting pipeline with {event.total_steps} steps")
elif isinstance(event, ToolCallEvent):
print(f"\n🔧 Calling: {event.tool_name}({event.tool_args})")
elif isinstance(event, ToolResultEvent):
status = "❌" if event.is_error else "✅"
print(f"\n{status} Result: {event.result_preview}")
elif isinstance(event, TextDeltaEvent):
print("\nText Delta Event: ", event.content, end='', flush=True)
elif isinstance(event, PipelineEndEvent):
print(f"\n✅ Completed in {event.total_duration:.2f}s")
asyncio.run(main())
Event Categories
Pipeline Events
| Event | Description | Key Attributes |
|---|
PipelineStartEvent | Emitted when execution begins | total_steps, is_streaming, task_description |
PipelineEndEvent | Emitted when execution ends | total_steps, executed_steps, total_duration, status, error_message |
Step Events
| Event | Description | Key Attributes |
|---|
StepStartEvent | Emitted when a step begins | step_name, step_index, step_description, total_steps |
StepEndEvent | Emitted when a step ends | step_name, step_index, status, execution_time, message |
| Event | Description | Key Attributes |
|---|
ToolCallEvent | Emitted when a tool is called | tool_name, tool_call_id, tool_args, tool_index |
ToolResultEvent | Emitted when a tool returns | tool_name, tool_call_id, result, result_preview, execution_time, is_error, error_message |
ExternalToolPauseEvent | Emitted when execution pauses for external tool | tool_name, tool_call_id, tool_args |
LLM Stream Events
| Event | Description | Key Attributes |
|---|
TextDeltaEvent | Text chunk during streaming | content, accumulated_content, part_index |
TextCompleteEvent | Text streaming complete | content, part_index |
ThinkingDeltaEvent | Reasoning content (for supported models) | content, part_index |
ToolCallDeltaEvent | Tool call arguments streaming | tool_name, tool_call_id, args_delta, part_index |
FinalOutputEvent | Final output ready | output, output_type |
Initialization & Model Events
| Event | Description | Key Attributes |
|---|
AgentInitializedEvent | Agent initialized for execution | agent_id, is_streaming |
StorageConnectionEvent | Storage connection established | storage_type, is_connected, has_memory, session_id |
LLMPreparedEvent | LLM manager prepared | default_model, requested_model, model_changed |
ModelSelectedEvent | Model selected for execution | model_name, provider, is_override |
ToolsConfiguredEvent | Tools configured for the task | tool_count, tool_names, has_mcp_handlers |
MessagesBuiltEvent | Request messages built | message_count, has_system_prompt, has_memory_messages, is_continuation |
ModelRequestStartEvent | Model request starting | model_name, is_streaming, has_tools, tool_call_count, tool_call_limit |
ModelResponseEvent | Model response received (non-streaming) | model_name, has_text, has_tool_calls, tool_call_count, finish_reason |
Cache Events
| Event | Description | Key Attributes |
|---|
CacheCheckEvent | Cache checked for existing response | cache_enabled, cache_method, cache_hit, similarity, input_preview |
CacheHitEvent | Cache hit occurred | cache_method, similarity, cached_response_preview |
CacheMissEvent | Cache miss occurred | cache_method, reason |
CacheStoredEvent | Response stored in cache | cache_method, duration_minutes |
Policy Events
| Event | Description | Key Attributes |
|---|
PolicyCheckEvent | Policy validation performed | policy_type, action, policies_checked, content_modified, blocked_reason |
PolicyFeedbackEvent | Policy feedback for retry | policy_type, feedback_message, retry_count, max_retries, violated_policy |
Memory, Reflection & Reliability Events
| Event | Description | Key Attributes |
|---|
MemoryUpdateEvent | Memory updated | messages_added, memory_type |
ReflectionEvent | Reflection processing applied | reflection_applied, improvement_made, original_preview, improved_preview |
ReliabilityEvent | Reliability layer processing | reliability_applied, modifications_made |
ExecutionCompleteEvent | Execution complete | output_type, has_output, output_preview, total_tool_calls, total_duration |
Run Lifecycle Events
| Event | Description | Key Attributes |
|---|
RunStartedEvent | Run started | agent_id, task_description |
RunCompletedEvent | Run completed successfully | agent_id, output_preview |
RunPausedEvent | Run paused (for HITL requirements) | reason, requirements, step_name |
RunCancelledEvent | Run cancelled | message, step_name |
Culture Events
| Event | Description | Key Attributes |
|---|
CultureUpdateEvent | Cultural knowledge updated (experimental) | culture_enabled, extraction_triggered, knowledge_updated |
Common Attributes
All events inherit from AgentEvent and share these base attributes:
| Attribute | Type | Description |
|---|
event_id | str | Unique identifier (8 chars) |
run_id | Optional[str] | The agent run ID this event belongs to (matches Agent.run_id) |
timestamp | datetime | When the event occurred |
event_type | str | Class name of the event (property, not a field) |
event_kind | str | Event category identifier |
Synchronous Event Streaming
For synchronous code, use stream() with events=True:
from upsonic import Agent, Task
from upsonic.run.events.events import TextDeltaEvent, PipelineStartEvent, PipelineEndEvent
agent = Agent("anthropic/claude-sonnet-4-5")
task = Task("Explain AI briefly")
# Stream events synchronously
for event in agent.stream(task, events=True):
if isinstance(event, PipelineStartEvent):
print(f"Starting pipeline with {event.total_steps} steps")
elif isinstance(event, TextDeltaEvent):
print(event.content, end='', flush=True)
elif isinstance(event, PipelineEndEvent):
print(f"\nCompleted in {event.total_duration:.2f}s")
# After streaming completes, access the final output
# Option 1: From the task (most direct)
final_output = task.response
print(f"\nFinal (from task.response): {final_output}")
# Option 2: From the agent's run output (includes additional metadata)
run_output = agent.get_run_output()
if run_output:
print(f"Final (from agent.get_run_output().output): {run_output.output}")
# You can also access other metadata:
# - run_output.usage (token usage)
# - run_output.messages (all messages)
# - run_output.tools (tool executions)
# - run_output.execution_stats (execution statistics)
Note: The stream() method returns an iterator that yields either:
str chunks when events=False (default) - for text streaming
AgentStreamEvent objects when events=True - for event streaming
For async streaming, use astream() with the same events parameter.
Tools can be added to agents during initialization or dynamically using add_tools(). Use get_tool_defs() to retrieve all registered tool definitions.
from upsonic import Agent
from upsonic.tools import tool
@tool
def calculate(x: int, y: int) -> int:
"""Add two numbers."""
return x + y
@tool
def another_tool() -> str:
"""Another tool."""
return "Another tool"
# Add tools during initialization
agent = Agent("anthropic/claude-sonnet-4-5", tools=[calculate])
# Or add tools after initialization
agent.add_tools([another_tool])
# Get all registered tool definitions
tool_defs = agent.get_tool_defs()
print(tool_defs)