Skip to main content

Available Backends

  • InMemoryStorage: Ephemeral (default)
  • SqliteStorage: SQLite database
  • PostgresStorage: PostgreSQL
  • MongoStorage: MongoDB
  • RedisStorage: Redis with TTL
  • JSONStorage: File-based JSON

SQLite Storage

import asyncio
from upsonic import Agent, Chat
from upsonic.storage import SqliteStorage


async def main():
    storage = SqliteStorage(db_file="chat.db")
    agent = Agent("openai/gpt-4o")

    chat = Chat(
        session_id="session1",
        user_id="user1",
        agent=agent,
        storage=storage
    )

    response = await chat.invoke("Hello")
    print(response)


if __name__ == "__main__":
    asyncio.run(main())

PostgreSQL Storage

import asyncio
from upsonic import Agent, Chat
from upsonic.storage import PostgresStorage


async def main():
    storage = PostgresStorage(
        db_url="postgresql://user:pass@localhost:5432/dbname"
    )
    agent = Agent("openai/gpt-4o")

    chat = Chat(
        session_id="session1",
        user_id="user1",
        agent=agent,
        storage=storage
    )

    response = await chat.invoke("Hello")
    print(response)


if __name__ == "__main__":
    asyncio.run(main())

MongoDB Storage

import asyncio
from upsonic import Agent, Chat
from upsonic.storage import MongoStorage


async def main():
    storage = MongoStorage(
        db_url="mongodb://localhost:27017",
        db_name="chat_db"
    )
    agent = Agent("openai/gpt-4o")

    chat = Chat(
        session_id="session1",
        user_id="user1",
        agent=agent,
        storage=storage
    )

    response = await chat.invoke("Hello")
    print(response)


if __name__ == "__main__":
    asyncio.run(main())

Redis Storage

import asyncio
from upsonic import Agent, Chat
from upsonic.storage import RedisStorage


async def main():
    storage = RedisStorage(
        db_url="redis://localhost:6379/0",
        db_prefix="chat",
        expire=3600
    )
    agent = Agent("openai/gpt-4o")

    chat = Chat(
        session_id="session1",
        user_id="user1",
        agent=agent,
        storage=storage
    )

    response = await chat.invoke("Hello")
    print(response)


if __name__ == "__main__":
    asyncio.run(main())

JSON Storage

import asyncio
from upsonic import Agent, Chat
from upsonic.storage import JSONStorage


async def main():
    storage = JSONStorage(db_path="./chat_data")
    agent = Agent("openai/gpt-4o")

    chat = Chat(
        session_id="session1",
        user_id="user1",
        agent=agent,
        storage=storage
    )

    response = await chat.invoke("Hello")
    print(response)


if __name__ == "__main__":
    asyncio.run(main())