Complex Chat Example
Advanced example with persistent storage, memory, and metrics tracking.Complete Example
Copy
import asyncio
from upsonic import Agent, Task, Chat
from upsonic.storage.providers import SqliteStorage
from pydantic import BaseModel
class UserProfile(BaseModel):
name: str
preferences: dict
async def main():
# Setup persistent storage
storage = SqliteStorage(
db_file="chat.db",
sessions_table_name="sessions",
profiles_table_name="profiles"
)
# Create agent
agent = Agent("openai/gpt-4o")
# Create chat with advanced configuration
chat = Chat(
session_id="complex_session",
user_id="user123",
agent=agent,
storage=storage,
full_session_memory=True,
summary_memory=True,
user_analysis_memory=True,
user_profile_schema=UserProfile,
num_last_messages=50,
retry_attempts=3,
retry_delay=1.0
)
# Have a conversation
await chat.invoke("My name is Bob and I love Python")
await chat.invoke("What's my name and what do I love?")
# Access metrics
metrics = chat.get_session_metrics()
print(f"Session duration: {metrics.duration:.1f}s")
print(f"Messages: {metrics.message_count}")
print(f"Total cost: ${chat.total_cost:.4f}")
print(f"Avg response time: {metrics.average_response_time:.2f}s")
# Get cost history
cost_history = chat.get_cost_history()
for entry in cost_history:
print(f"Cost: ${entry['estimated_cost']:.4f}, "
f"Tokens: {entry['input_tokens']} in, {entry['output_tokens']} out")
# Clean up
await chat.close()
if __name__ == "__main__":
asyncio.run(main())
With Context Manager
Copy
import asyncio
from upsonic import Agent, Task, Chat
from upsonic.storage.providers import SqliteStorage
async def main():
storage = SqliteStorage(
db_file="chat.db",
sessions_table_name="sessions",
profiles_table_name="profiles"
)
agent = Agent("openai/gpt-4o")
async with Chat(
session_id="session1",
user_id="user1",
agent=agent,
storage=storage,
full_session_memory=True,
summary_memory=True
) as chat:
# Stream responses
async for chunk in chat.stream("Tell me a story about AI"):
print(chunk, end="", flush=True)
print()
# Send follow-up
response = await chat.invoke("Continue the story")
print(f"\nAssistant: {response}")
# Access session summary
summary = chat.get_session_summary()
print(f"\n{summary}")
if __name__ == "__main__":
asyncio.run(main())
Multi-Session Example
Copy
import asyncio
from upsonic import Agent, Chat
from upsonic.storage.providers import SqliteStorage
async def main():
storage = SqliteStorage(
db_file="chat.db",
sessions_table_name="sessions",
profiles_table_name="profiles"
)
agent = Agent("openai/gpt-4o")
# First session - enable user_analysis_memory to create user profile
async with Chat(
session_id="session1",
user_id="user1",
agent=agent,
storage=storage,
full_session_memory=True,
user_analysis_memory=True # Enable to create user profile from conversation
) as chat1:
await chat1.invoke("I love AI")
# Second session (same user, different session)
async with Chat(
session_id="session2", # Session is different from previous session
user_id="user1", # User is the same as in previous user session. Agent will remember the user profile from previous user session.
agent=agent,
storage=storage,
full_session_memory=True,
user_analysis_memory=True, # Enable to load user profile from previous user session.
debug=True # Enable debug to trace profile loading
) as chat2:
response = await chat2.invoke("What's my favorite topic?")
print(response) # Agent remembers from previous session
if __name__ == "__main__":
asyncio.run(main())

