Skip to main content
Agents can be executed using different methods depending on your needs - synchronous, asynchronous, or streaming.

Synchronous Execution

The simplest way to run an agent is using the do() method, which executes synchronously and returns the result.
from upsonic import Agent, Task

# Create agent
agent = Agent("openai/gpt-4o")

# Execute with Task object
task = Task("What is the capital of France?")
result = agent.do(task)
print(result)  # Output: Paris

# Or execute directly with a string
result = agent.do("What is the capital of France?")
print(result)  # Output: Paris

Asynchronous Execution

For concurrent operations or async applications, use do_async() which returns a coroutine.
from upsonic import Agent, Task
import asyncio

async def main():
    # Create agent
    agent = Agent("openai/gpt-4o")
    
    # Execute asynchronously (accepts Task or string)
    result = await agent.do_async("Explain quantum computing in simple terms")
    print(result)

# Run async function
asyncio.run(main())

Streaming Text Output

For real-time output, use stream() to get responses as they’re generated.
import asyncio
from upsonic import Agent, Task


async def main():
    # Create agent and task
    agent = Agent("openai/gpt-4o")
    task = Task("Write a short poem about coding")
    
    # Stream the output
    async with agent.stream(task) as result:
        async for text_chunk in result.stream_output():
            print(text_chunk, end='', flush=True)
    print()  # New line after streaming


if __name__ == "__main__":
    asyncio.run(main())

Event Streaming

For full visibility into agent execution, use stream_events() to receive detailed events about every step of the pipeline.
import asyncio
from upsonic import Agent, Task
from upsonic.agent.events import (
    PipelineStartEvent,
    PipelineEndEvent,
    TextDeltaEvent,
    ToolCallEvent,
    ToolResultEvent,
)
from upsonic.tools import tool

@tool
def calculate(x: int, y: int) -> int:
    """Add two numbers."""
    return x + y

async def main():
    agent = Agent("openai/gpt-4o")
    task = Task("Calculate 5 + 3", tools=[calculate])
    
    async with agent.stream(task) as result:
        async for event in result.stream_events():
            if isinstance(event, PipelineStartEvent):
                print(f"🚀 Starting pipeline with {event.total_steps} steps")
            
            elif isinstance(event, ToolCallEvent):
                print(f"\n🔧 Calling: {event.tool_name}({event.tool_args})")
            
            elif isinstance(event, ToolResultEvent):
                status = "❌" if event.is_error else "✅"
                print(f"\n{status} Result: {event.result_preview}")
            
            elif isinstance(event, TextDeltaEvent):
                print("\nText Delta Event: ", event.content, end='', flush=True)
            
            elif isinstance(event, PipelineEndEvent):
                print(f"\n✅ Completed in {event.total_duration:.2f}s")

asyncio.run(main())

Event Categories

Pipeline Events

EventDescriptionKey Attributes
PipelineStartEventEmitted when execution beginstotal_steps, is_streaming, task_description
PipelineEndEventEmitted when execution endstotal_steps, executed_steps, total_duration, status, error_message

Step Events

EventDescriptionKey Attributes
StepStartEventEmitted when a step beginsstep_name, step_index, step_description, total_steps
StepEndEventEmitted when a step endsstep_name, step_index, status, execution_time, message

Tool Events

EventDescriptionKey Attributes
ToolCallEventEmitted when a tool is calledtool_name, tool_call_id, tool_args, tool_index, is_parallel
ToolResultEventEmitted when a tool returnstool_name, tool_call_id, result, result_preview, execution_time, is_error, error_message
ExternalToolPauseEventEmitted when execution pauses for external tooltool_name, tool_call_id, tool_args

LLM Stream Events

EventDescriptionKey Attributes
TextDeltaEventText chunk during streamingcontent, accumulated_content, part_index
TextCompleteEventText streaming completecontent, part_index
ThinkingDeltaEventReasoning content (for supported models)content, part_index
ToolCallDeltaEventTool call arguments streamingtool_name, tool_call_id, args_delta, part_index
FinalOutputEventFinal output readyoutput, output_type

Initialization & Model Events

EventDescriptionKey Attributes
AgentInitializedEventAgent initialized for executionagent_id, is_streaming
ModelSelectedEventModel selected for executionmodel_name, provider, is_override
ToolsConfiguredEventTools configured for the tasktool_count, tool_names, has_mcp_handlers
MessagesBuiltEventRequest messages builtmessage_count, has_system_prompt, has_memory_messages, is_continuation
ModelRequestStartEventModel request startingmodel_name, is_streaming, has_tools, tool_call_count, tool_call_limit
ModelResponseEventModel response received (non-streaming)model_name, has_text, has_tool_calls, tool_call_count, finish_reason

Cache Events

EventDescriptionKey Attributes
CacheCheckEventCache checked for existing responsecache_enabled, cache_method, cache_hit, similarity, input_preview
CacheHitEventCache hit occurredcache_method, similarity, cached_response_preview
CacheMissEventCache miss occurredcache_method, reason
CacheStoredEventResponse stored in cachecache_method, duration_minutes

Policy Events

EventDescriptionKey Attributes
PolicyCheckEventPolicy validation performedpolicy_type, action, policies_checked, content_modified, blocked_reason
PolicyFeedbackEventPolicy feedback for retrypolicy_type, feedback_message, retry_count, max_retries, violated_policy

Memory, Reflection & Reliability Events

EventDescriptionKey Attributes
MemoryUpdateEventMemory updatedmessages_added, memory_type
ReflectionEventReflection processing appliedreflection_applied, improvement_made, original_preview, improved_preview
ReliabilityEventReliability layer processingreliability_applied, modifications_made
ExecutionCompleteEventExecution completeoutput_type, has_output, output_preview, total_tool_calls, total_duration

Common Attributes

All events inherit from AgentEvent and share these base attributes:
AttributeTypeDescription
event_idstrUnique identifier (8 chars)
timestampdatetimeWhen the event occurred
event_typestrClass name of the event
event_kindstrEvent category identifier

Synchronous Event Streaming

For synchronous code, use stream_events_sync():
from upsonic import Agent, Task
from upsonic.agent.events import TextDeltaEvent

agent = Agent("openai/gpt-4o")
result = agent.stream(Task("Explain AI briefly"))

for event in result.stream_events_sync():
    if isinstance(event, TextDeltaEvent):
        print(event.content, end='', flush=True)

print(f"\nFinal: {result.output}")

StreamRunResult Methods

MethodDescription
stream_events()Async iterator for all events
stream_events_sync()Sync iterator for all events
stream_output()Async iterator for text only
stream_output_sync()Sync iterator for text only
outputGet final output after streaming
is_complete()Check if streaming finished

Tool Management

Tools can be added to agents during initialization or dynamically using add_tools(). Use get_tool_defs() to retrieve all registered tool definitions.
from upsonic import Agent
from upsonic.tools import tool

@tool
def calculate(x: int, y: int) -> int:
    """Add two numbers."""
    return x + y

@tool
def another_tool() -> str:
    """Another tool."""
    return "Another tool"

# Add tools during initialization
agent = Agent("openai/gpt-4o", tools=[calculate])

# Or add tools after initialization
agent.add_tools([another_tool])

# Get all registered tool definitions
tool_defs = agent.get_tool_defs()

print(tool_defs)