Skip to main content

Attributes

The StateGraph class __init__ method accepts the following parameters:
AttributeTypeDefaultDescription
state_schemaType[StateT]RequiredThe state schema that flows through the graph (TypedDict class)
input_schemaType | NoneNoneOptional input schema for validation
output_schemaType | NoneNoneOptional output schema for filtering
context_schemaType | NoneNoneOptional schema for runtime context

Configuration Example

from typing_extensions import TypedDict
from upsonic.graphv2 import (
    StateGraph, START, END,
    RetryPolicy, CachePolicy,
    MemorySaver, InMemoryCache,
    Command
)

# Define state schema
class MyState(TypedDict):
    input: str
    output: str
    count: int

# Define input/output schemas
class InputState(TypedDict):
    input: str

class OutputState(TypedDict):
    output: str

# Define node function
def process_node(state: MyState) -> dict:
    """Process the input and return output."""
    return {
        "output": f"Processed: {state['input']}",
        "count": state["count"] + 1
    }

# Create graph builder with schemas
builder = StateGraph(
    MyState,
    input_schema=InputState,
    output_schema=OutputState
)

# Add node with retry and cache policies
builder.add_node(
    "process",
    process_node,
    retry_policy=RetryPolicy(
        max_attempts=3,
        initial_interval=1.0,
        backoff_factor=2.0,
        jitter=True
    ),
    cache_policy=CachePolicy(ttl=300)  # 5 minutes
)

# Add edges
builder.add_edge(START, "process")
builder.add_edge("process", END)

# Compile with checkpointer and cache
checkpointer = MemorySaver()
cache = InMemoryCache()

graph = builder.compile(
    checkpointer=checkpointer,
    cache=cache,
    durability="async",  # or "sync" or "exit"
    interrupt_before=None,  # Optional: pause before these nodes
    interrupt_after=None  # Optional: pause after these nodes
)

# Execute the graph
config = {
    "configurable": {
        "thread_id": "session-1"
    },
    "recursion_limit": 50  # Optional: limit execution steps
}

result = graph.invoke(
    {
        "input": "test",
        "output": "",
        "count": 0
    },
    config=config
)

print(result["output"])  # Output: Processed: test

# Get current state
state = graph.get_state(config={"configurable": {"thread_id": "session-1"}})
print(f"Current state: {state.values}")

# Get execution history
history = graph.get_state_history(
    config={"configurable": {"thread_id": "session-1"}},
    limit=10
)
print(f"History: {len(history)} checkpoints")