from typing_extensions import TypedDict
from upsonic.graphv2 import (
StateGraph, START, END,
RetryPolicy, CachePolicy,
MemorySaver, InMemoryCache,
Command
)
# Define state schema
class MyState(TypedDict):
input: str
output: str
count: int
# Define input/output schemas
class InputState(TypedDict):
input: str
class OutputState(TypedDict):
output: str
# Define node function
def process_node(state: MyState) -> dict:
"""Process the input and return output."""
return {
"output": f"Processed: {state['input']}",
"count": state["count"] + 1
}
# Create graph builder with schemas
builder = StateGraph(
MyState,
input_schema=InputState,
output_schema=OutputState
)
# Add node with retry and cache policies
builder.add_node(
"process",
process_node,
retry_policy=RetryPolicy(
max_attempts=3,
initial_interval=1.0,
backoff_factor=2.0,
jitter=True
),
cache_policy=CachePolicy(ttl=300) # 5 minutes
)
# Add edges
builder.add_edge(START, "process")
builder.add_edge("process", END)
# Compile with checkpointer and cache
checkpointer = MemorySaver()
cache = InMemoryCache()
graph = builder.compile(
checkpointer=checkpointer,
cache=cache,
durability="async", # or "sync" or "exit"
interrupt_before=None, # Optional: pause before these nodes
interrupt_after=None # Optional: pause after these nodes
)
# Execute the graph
config = {
"configurable": {
"thread_id": "session-1"
},
"recursion_limit": 50 # Optional: limit execution steps
}
result = graph.invoke(
{
"input": "test",
"output": "",
"count": 0
},
config=config
)
print(result["output"]) # Output: Processed: test
# Get current state
state = graph.get_state(config={"configurable": {"thread_id": "session-1"}})
print(f"Current state: {state.values}")
# Get execution history
history = graph.get_state_history(
config={"configurable": {"thread_id": "session-1"}},
limit=10
)
print(f"History: {len(history)} checkpoints")