Complex Chat Example
Advanced example with persistent storage, memory, and metrics tracking.Complete Example
Copy
import asyncio
from upsonic import Agent, Task, Chat
from upsonic.storage.providers import SqliteStorage
from pydantic import BaseModel
class UserProfile(BaseModel):
name: str
preferences: dict
async def main():
# Setup persistent storage
storage = SqliteStorage("sessions", "profiles", "chat.db")
# Create agent
agent = Agent("openai/gpt-4o")
# Create chat with advanced configuration
chat = Chat(
session_id="complex_session",
user_id="user123",
agent=agent,
storage=storage,
full_session_memory=True,
summary_memory=True,
user_analysis_memory=True,
user_profile_schema=UserProfile,
num_last_messages=50,
retry_attempts=3,
retry_delay=1.0
)
# Have a conversation
await chat.invoke("My name is Bob and I love Python")
await chat.invoke("What's my name and what do I love?")
# Access metrics
metrics = chat.get_session_metrics()
print(f"Session duration: {metrics.duration:.1f}s")
print(f"Messages: {metrics.message_count}")
print(f"Total cost: ${chat.total_cost:.4f}")
print(f"Avg response time: {metrics.average_response_time:.2f}s")
# Get cost history
cost_history = chat.get_cost_history()
for entry in cost_history:
print(f"Cost: ${entry['estimated_cost']:.4f}, "
f"Tokens: {entry['input_tokens']} in, {entry['output_tokens']} out")
# Clean up
await chat.close()
if __name__ == "__main__":
asyncio.run(main())
With Context Manager
Copy
import asyncio
from upsonic import Agent, Task, Chat
from upsonic.storage.providers import SqliteStorage
async def main():
storage = SqliteStorage("sessions", "profiles", "chat.db")
agent = Agent("openai/gpt-4o")
async with Chat(
session_id="session1",
user_id="user1",
agent=agent,
storage=storage,
full_session_memory=True,
summary_memory=True
) as chat:
# Stream responses
async for chunk in chat.stream("Tell me a story about AI"):
print(chunk, end="", flush=True)
print()
# Send follow-up
response = await chat.invoke("Continue the story")
print(f"\nAssistant: {response}")
# Access session summary
summary = chat.get_session_summary()
print(f"\n{summary}")
if __name__ == "__main__":
asyncio.run(main())
Multi-Session Example
Copy
import asyncio
from upsonic import Agent, Chat
from upsonic.storage.providers import SqliteStorage
async def main():
storage = SqliteStorage("sessions", "profiles", "chat.db")
agent = Agent("openai/gpt-4o")
# First session
async with Chat(
session_id="session1",
user_id="user1",
agent=agent,
storage=storage,
full_session_memory=True
) as chat1:
await chat1.invoke("Remember: I prefer dark mode")
# Second session (same user, different session)
async with Chat(
session_id="session2",
user_id="user1",
agent=agent,
storage=storage,
full_session_memory=True
) as chat2:
response = await chat2.invoke("What's my preference?")
print(response) # Agent remembers from previous session
if __name__ == "__main__":
asyncio.run(main())

