Skip to main content

Complex Chat Example

Advanced example with persistent storage, memory, and metrics tracking.

Complete Example

import asyncio
from upsonic import Agent, Task, Chat
from upsonic.storage.providers import SqliteStorage
from pydantic import BaseModel

class UserProfile(BaseModel):
    name: str
    preferences: dict

async def main():
    # Setup persistent storage
    storage = SqliteStorage("sessions", "profiles", "chat.db")
    
    # Create agent
    agent = Agent("openai/gpt-4o")
    
    # Create chat with advanced configuration
    chat = Chat(
        session_id="complex_session",
        user_id="user123",
        agent=agent,
        storage=storage,
        full_session_memory=True,
        summary_memory=True,
        user_analysis_memory=True,
        user_profile_schema=UserProfile,
        num_last_messages=50,
        retry_attempts=3,
        retry_delay=1.0
    )
    
    # Have a conversation
    await chat.invoke("My name is Bob and I love Python")
    await chat.invoke("What's my name and what do I love?")
    
    # Access metrics
    metrics = chat.get_session_metrics()
    print(f"Session duration: {metrics.duration:.1f}s")
    print(f"Messages: {metrics.message_count}")
    print(f"Total cost: ${chat.total_cost:.4f}")
    print(f"Avg response time: {metrics.average_response_time:.2f}s")
    
    # Get cost history
    cost_history = chat.get_cost_history()
    for entry in cost_history:
        print(f"Cost: ${entry['estimated_cost']:.4f}, "
              f"Tokens: {entry['input_tokens']} in, {entry['output_tokens']} out")
    
    # Clean up
    await chat.close()

if __name__ == "__main__":
    asyncio.run(main())

With Context Manager

import asyncio
from upsonic import Agent, Task, Chat
from upsonic.storage.providers import SqliteStorage

async def main():
    storage = SqliteStorage("sessions", "profiles", "chat.db")
    agent = Agent("openai/gpt-4o")
    
    async with Chat(
        session_id="session1",
        user_id="user1",
        agent=agent,
        storage=storage,
        full_session_memory=True,
        summary_memory=True
    ) as chat:
        # Stream responses
        async for chunk in chat.stream("Tell me a story about AI"):
            print(chunk, end="", flush=True)
        print()
        
        # Send follow-up
        response = await chat.invoke("Continue the story")
        print(f"\nAssistant: {response}")
        
        # Access session summary
        summary = chat.get_session_summary()
        print(f"\n{summary}")

if __name__ == "__main__":
    asyncio.run(main())

Multi-Session Example

import asyncio
from upsonic import Agent, Chat
from upsonic.storage.providers import SqliteStorage

async def main():
    storage = SqliteStorage("sessions", "profiles", "chat.db")
    agent = Agent("openai/gpt-4o")
    
    # First session
    async with Chat(
        session_id="session1",
        user_id="user1",
        agent=agent,
        storage=storage,
        full_session_memory=True
    ) as chat1:
        await chat1.invoke("Remember: I prefer dark mode")
    
    # Second session (same user, different session)
    async with Chat(
        session_id="session2",
        user_id="user1",
        agent=agent,
        storage=storage,
        full_session_memory=True
    ) as chat2:
        response = await chat2.invoke("What's my preference?")
        print(response)  # Agent remembers from previous session

if __name__ == "__main__":
    asyncio.run(main())