Skip to main content
Agents can be executed using different methods depending on your needs - synchronous, asynchronous, or streaming.

Synchronous Execution

The simplest way to run an agent is using the do() method, which executes synchronously and returns the result.
from upsonic import Agent, Task

# Create agent
agent = Agent("openai/gpt-4o")

# Execute with Task object
task = Task("What is the capital of France?")
result = agent.do(task)
print(result)  # Output: Paris

# Or execute directly with a string
result = agent.do("What is the capital of France?")
print(result)  # Output: Paris

Asynchronous Execution

For concurrent operations or async applications, use do_async() which returns a coroutine.
from upsonic import Agent, Task
import asyncio

async def main():
    # Create agent
    agent = Agent("openai/gpt-4o")
    
    # Execute asynchronously (accepts Task or string)
    result = await agent.do_async("Explain quantum computing in simple terms")
    print(result)

# Run async function
asyncio.run(main())

Streaming Text Output

For real-time output, use stream() to get responses as they’re generated.
import asyncio
from upsonic import Agent, Task


async def main():
    # Create agent and task
    agent = Agent("openai/gpt-4o")
    task = Task("Write a short poem about coding")
    
    # Stream the output
    async for text_chunk in agent.astream(task):
        print(text_chunk, end='', flush=True)
    print()  # New line after streaming


if __name__ == "__main__":
    asyncio.run(main())

Event Streaming

For full visibility into agent execution, use stream_events() to receive detailed events about every step of the pipeline.
import asyncio
from upsonic import Agent, Task
from upsonic.run.events.events import (
    PipelineStartEvent,
    PipelineEndEvent,
    TextDeltaEvent,
    ToolCallEvent,
    ToolResultEvent,
)
from upsonic.tools import tool

@tool
def calculate(x: int, y: int) -> int:
    """Add two numbers."""
    return x + y

async def main():
    agent = Agent("openai/gpt-4o")
    task = Task("Calculate 5 + 3", tools=[calculate])
    
    async for event in agent.astream(task, events=True):
        if isinstance(event, PipelineStartEvent):
            print(f"🚀 Starting pipeline with {event.total_steps} steps")
        
        elif isinstance(event, ToolCallEvent):
            print(f"\n🔧 Calling: {event.tool_name}({event.tool_args})")
        
        elif isinstance(event, ToolResultEvent):
            status = "❌" if event.is_error else "✅"
            print(f"\n{status} Result: {event.result_preview}")
        
        elif isinstance(event, TextDeltaEvent):
            print("\nText Delta Event: ", event.content, end='', flush=True)
        
        elif isinstance(event, PipelineEndEvent):
            print(f"\n✅ Completed in {event.total_duration:.2f}s")

asyncio.run(main())

Event Categories

Pipeline Events

EventDescriptionKey Attributes
PipelineStartEventEmitted when execution beginstotal_steps, is_streaming, task_description
PipelineEndEventEmitted when execution endstotal_steps, executed_steps, total_duration, status, error_message

Step Events

EventDescriptionKey Attributes
StepStartEventEmitted when a step beginsstep_name, step_index, step_description, total_steps
StepEndEventEmitted when a step endsstep_name, step_index, status, execution_time, message

Tool Events

EventDescriptionKey Attributes
ToolCallEventEmitted when a tool is calledtool_name, tool_call_id, tool_args, tool_index
ToolResultEventEmitted when a tool returnstool_name, tool_call_id, result, result_preview, execution_time, is_error, error_message
ExternalToolPauseEventEmitted when execution pauses for external tooltool_name, tool_call_id, tool_args

LLM Stream Events

EventDescriptionKey Attributes
TextDeltaEventText chunk during streamingcontent, accumulated_content, part_index
TextCompleteEventText streaming completecontent, part_index
ThinkingDeltaEventReasoning content (for supported models)content, part_index
ToolCallDeltaEventTool call arguments streamingtool_name, tool_call_id, args_delta, part_index
FinalOutputEventFinal output readyoutput, output_type

Initialization & Model Events

EventDescriptionKey Attributes
AgentInitializedEventAgent initialized for executionagent_id, is_streaming
StorageConnectionEventStorage connection establishedstorage_type, is_connected, has_memory, session_id
LLMPreparedEventLLM manager prepareddefault_model, requested_model, model_changed
ModelSelectedEventModel selected for executionmodel_name, provider, is_override
ToolsConfiguredEventTools configured for the tasktool_count, tool_names, has_mcp_handlers
MessagesBuiltEventRequest messages builtmessage_count, has_system_prompt, has_memory_messages, is_continuation
ModelRequestStartEventModel request startingmodel_name, is_streaming, has_tools, tool_call_count, tool_call_limit
ModelResponseEventModel response received (non-streaming)model_name, has_text, has_tool_calls, tool_call_count, finish_reason

Cache Events

EventDescriptionKey Attributes
CacheCheckEventCache checked for existing responsecache_enabled, cache_method, cache_hit, similarity, input_preview
CacheHitEventCache hit occurredcache_method, similarity, cached_response_preview
CacheMissEventCache miss occurredcache_method, reason
CacheStoredEventResponse stored in cachecache_method, duration_minutes

Policy Events

EventDescriptionKey Attributes
PolicyCheckEventPolicy validation performedpolicy_type, action, policies_checked, content_modified, blocked_reason
PolicyFeedbackEventPolicy feedback for retrypolicy_type, feedback_message, retry_count, max_retries, violated_policy

Memory, Reflection & Reliability Events

EventDescriptionKey Attributes
MemoryUpdateEventMemory updatedmessages_added, memory_type
ReflectionEventReflection processing appliedreflection_applied, improvement_made, original_preview, improved_preview
ReliabilityEventReliability layer processingreliability_applied, modifications_made
ExecutionCompleteEventExecution completeoutput_type, has_output, output_preview, total_tool_calls, total_duration

Run Lifecycle Events

EventDescriptionKey Attributes
RunStartedEventRun startedagent_id, task_description
RunCompletedEventRun completed successfullyagent_id, output_preview
RunPausedEventRun paused (for HITL requirements)reason, requirements, step_name
RunCancelledEventRun cancelledmessage, step_name

Culture Events

EventDescriptionKey Attributes
CultureUpdateEventCultural knowledge updated (experimental)culture_enabled, extraction_triggered, knowledge_updated

Common Attributes

All events inherit from AgentEvent and share these base attributes:
AttributeTypeDescription
event_idstrUnique identifier (8 chars)
run_idOptional[str]The agent run ID this event belongs to (matches Agent.run_id)
timestampdatetimeWhen the event occurred
event_typestrClass name of the event (property, not a field)
event_kindstrEvent category identifier

Synchronous Event Streaming

For synchronous code, use stream() with events=True:
from upsonic import Agent, Task
from upsonic.run.events.events import TextDeltaEvent, PipelineStartEvent, PipelineEndEvent

agent = Agent("openai/gpt-4o")
task = Task("Explain AI briefly")

# Stream events synchronously
for event in agent.stream(task, events=True):
    if isinstance(event, PipelineStartEvent):
        print(f"Starting pipeline with {event.total_steps} steps")
    elif isinstance(event, TextDeltaEvent):
        print(event.content, end='', flush=True)
    elif isinstance(event, PipelineEndEvent):
        print(f"\nCompleted in {event.total_duration:.2f}s")

# After streaming completes, access the final output
# Option 1: From the task (most direct)
final_output = task.response
print(f"\nFinal (from task.response): {final_output}")

# Option 2: From the agent's run output (includes additional metadata)
run_output = agent.get_run_output()
if run_output:
    print(f"Final (from agent.get_run_output().output): {run_output.output}")
    # You can also access other metadata:
    # - run_output.usage (token usage)
    # - run_output.messages (all messages)
    # - run_output.tools (tool executions)
    # - run_output.execution_stats (execution statistics)
Note: The stream() method returns an iterator that yields either:
  • str chunks when events=False (default) - for text streaming
  • AgentStreamEvent objects when events=True - for event streaming
For async streaming, use astream() with the same events parameter.

Tool Management

Tools can be added to agents during initialization or dynamically using add_tools(). Use get_tool_defs() to retrieve all registered tool definitions.
from upsonic import Agent
from upsonic.tools import tool

@tool
def calculate(x: int, y: int) -> int:
    """Add two numbers."""
    return x + y

@tool
def another_tool() -> str:
    """Another tool."""
    return "Another tool"

# Add tools during initialization
agent = Agent("openai/gpt-4o", tools=[calculate])

# Or add tools after initialization
agent.add_tools([another_tool])

# Get all registered tool definitions
tool_defs = agent.get_tool_defs()

print(tool_defs)