Overview
Human-in-the-Loop (HITL) patterns let you pause execution for human review, approval, or input. This is essential for:- ✅ Content Moderation - Review AI-generated content before publishing
- 🎯 Decision Approval - Require human sign-off on critical actions
- ✏️ Content Editing - Let humans refine AI outputs
- 🔍 Quality Control - Inspect intermediate results
The Interrupt Primitive
Theinterrupt() function pauses execution and returns control to the caller:
Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command, interrupt
class ReviewState(TypedDict):
draft: str
approved: bool
def review_node(state: ReviewState) -> dict:
"""Pause and show data to human."""
human_response = interrupt({
"action": "review",
"data": state["draft"],
"options": ["approve", "edit", "reject"]
})
# Execution resumes here when graph.invoke() is called with Command(resume=...)
if human_response.get("action") == "approve":
return {"approved": True}
else:
return {"approved": False}
# Build the graph
builder = StateGraph(ReviewState)
builder.add_node("review", review_node)
builder.add_edge(START, "review")
builder.add_edge("review", END)
# Compile with checkpointer (required for interrupts)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Execute the graph
config = {"configurable": {"thread_id": "review-1"}}
result = graph.invoke(
{"draft": "This is my draft content to review.", "approved": False},
config=config
)
# Check if interrupted
if "__interrupt__" in result:
print("⏸️ Execution paused for review")
interrupt_data = result["__interrupt__"][0]["value"]
print(f"Draft to review: {interrupt_data['data']}")
print(f"Options: {interrupt_data['options']}")
# Simulate human decision
human_decision = {"action": "approve"}
# Resume execution with the human's response
final_result = graph.invoke(
Command(resume=human_decision),
config=config
)
print(f"Approved: {final_result['approved']}") # Output: Approved: True
How Interrupts Work
- Node calls interrupt() - Execution pauses
- Graph returns with
__interrupt__key - Contains interrupt data - Application shows data to human - Display UI, wait for input
- Resume with Command(resume=…) - Continue execution with human response
Basic Interrupt Example
Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command, interrupt
class ApprovalState(TypedDict):
content: str
approved: bool
feedback: str
def generate_content(state: ApprovalState) -> dict:
"""Generate AI content based on the topic."""
draft = "This is AI-generated content about " + state["content"]
return {"content": draft}
def review_node(state: ApprovalState) -> dict:
"""Pause for human review."""
response = interrupt({
"action": "review_content",
"content": state["content"],
"instruction": "Please review and approve or provide feedback"
})
if response.get("action") == "approve":
return {"approved": True, "feedback": ""}
else:
return {"approved": False, "feedback": response.get("feedback", "")}
# Build the graph
builder = StateGraph(ApprovalState)
builder.add_node("generate", generate_content)
builder.add_node("review", review_node)
builder.add_edge(START, "generate")
builder.add_edge("generate", "review")
builder.add_edge("review", END)
# Compile with checkpointer (required for interrupts)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Execute the graph
config = {"configurable": {"thread_id": "approval-1"}}
result = graph.invoke(
{"content": "Python programming", "approved": False, "feedback": ""},
config=config
)
# Check if interrupted
if "__interrupt__" in result:
print("⏸️ Execution paused for review")
interrupt_data = result["__interrupt__"][0]["value"]
print(f"Content to review: {interrupt_data['content']}")
# Simulate human approval
human_decision = {"action": "approve"}
# Resume execution
final_result = graph.invoke(
Command(resume=human_decision),
config=config
)
print(f"Approved: {final_result['approved']}")
Interrupts require checkpointers because execution state must be persisted between the interrupt and resume calls.
Interrupt Patterns
Pattern 1: Approve/Reject
Simple binary approval workflow:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command, interrupt
class ActionState(TypedDict):
action_details: str
status: str
def approval_node(state: ActionState) -> dict:
"""Request approval for an action."""
approved = interrupt({
"action": "approve_action",
"data": state["action_details"]
})
return {"status": "approved" if approved else "rejected"}
# Build the graph
builder = StateGraph(ActionState)
builder.add_node("approve", approval_node)
builder.add_edge(START, "approve")
builder.add_edge("approve", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Start execution
config = {"configurable": {"thread_id": "action-1"}}
result = graph.invoke(
{"action_details": "Delete all records", "status": "pending"},
config=config
)
# Resume with approval
if "__interrupt__" in result:
final = graph.invoke(Command(resume=True), config=config) # Approve
print(f"Status: {final['status']}") # Output: Status: approved
Pattern 2: Edit Content
Allow humans to modify AI-generated content:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command, interrupt
class EditState(TypedDict):
draft: str
final_content: str
def generate_draft(state: EditState) -> dict:
"""Generate initial draft."""
return {"draft": "AI-generated summary of the topic..."}
def edit_node(state: EditState) -> dict:
"""Pause for human editing."""
edited_text = interrupt({
"action": "edit_content",
"original": state["draft"]
})
return {"final_content": edited_text}
# Build the graph
builder = StateGraph(EditState)
builder.add_node("generate", generate_draft)
builder.add_node("edit", edit_node)
builder.add_edge(START, "generate")
builder.add_edge("generate", "edit")
builder.add_edge("edit", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Start execution
config = {"configurable": {"thread_id": "edit-1"}}
result = graph.invoke(
{"draft": "", "final_content": ""},
config=config
)
# Resume with edited text
if "__interrupt__" in result:
final = graph.invoke(
Command(resume="Human-edited and improved content here"),
config=config
)
print(f"Final: {final['final_content']}")
Pattern 3: Iterative Refinement
Loop until human approves:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command, interrupt
class RefinementState(TypedDict):
topic: str
draft: str
feedback: str
approved: bool
iteration: int
def generate_content(state: RefinementState) -> dict:
"""Generate or refine content based on feedback."""
if state["feedback"]:
# Incorporate feedback into new draft
draft = f"Revised draft (iteration {state['iteration'] + 1}) incorporating: {state['feedback']}"
else:
draft = f"Initial draft about {state['topic']}"
return {"draft": draft, "iteration": state["iteration"] + 1}
def review_and_refine(state: RefinementState) -> Command:
"""Review draft and decide whether to approve or request changes."""
feedback = interrupt({
"action": "review_and_refine",
"draft": state["draft"],
"iteration": state["iteration"]
})
if feedback["action"] == "approve":
return Command(
update={"approved": True},
goto=END
)
else:
return Command(
update={"feedback": feedback["comments"], "approved": False},
goto="generate" # Loop back to generate
)
# Build the graph
builder = StateGraph(RefinementState)
builder.add_node("generate", generate_content)
builder.add_node("review", review_and_refine)
builder.add_edge(START, "generate")
builder.add_edge("generate", "review")
# Note: review uses Command to route, so no edge needed
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Start execution
config = {"configurable": {"thread_id": "refine-1"}}
result = graph.invoke(
{"topic": "AI Safety", "draft": "", "feedback": "", "approved": False, "iteration": 0},
config=config
)
# First review - request changes
if "__interrupt__" in result:
result = graph.invoke(
Command(resume={"action": "revise", "comments": "Add more examples"}),
config=config
)
# Second review - approve
if "__interrupt__" in result:
final = graph.invoke(
Command(resume={"action": "approve"}),
config=config
)
print(f"Approved after {final['iteration']} iterations")
Interrupt Configuration
Configure where interrupts can happen using compile options:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command
class WorkflowState(TypedDict):
data: str
processed: str
result: str
def process_data(state: WorkflowState) -> dict:
"""Process the data."""
return {"processed": f"Processed: {state['data']}"}
def critical_action(state: WorkflowState) -> dict:
"""Perform a critical action that needs approval."""
return {"result": f"Action completed on: {state['processed']}"}
# Build the graph
builder = StateGraph(WorkflowState)
builder.add_node("process", process_data)
builder.add_node("critical_action", critical_action)
builder.add_edge(START, "process")
builder.add_edge("process", "critical_action")
builder.add_edge("critical_action", END)
checkpointer = MemorySaver()
# Interrupt before specific nodes
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["critical_action"]
)
# Or interrupt after specific nodes
graph_after = builder.compile(
checkpointer=checkpointer,
interrupt_after=["process"]
)
# Execute with interrupt_before
config = {"configurable": {"thread_id": "workflow-1"}}
result = graph.invoke(
{"data": "input", "processed": "", "result": ""},
config=config
)
# Graph pauses before critical_action
if "__interrupt__" in result:
print("Paused before critical action")
# Resume to continue
final = graph.invoke(Command(resume=None), config=config)
Best Practices
1. Provide Clear Context
Give users all the information they need to make decisions:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import interrupt
class DocumentState(TypedDict):
draft: str
author: str
version: int
def review_with_context(state: DocumentState) -> dict:
"""Review with rich context."""
response = interrupt({
"action": "review",
"content": state["draft"],
"metadata": {
"author": state["author"],
"word_count": len(state["draft"].split()),
"version": state["version"]
},
"instructions": "Review for accuracy and tone"
})
return {"approved": response.get("approved", False)}
2. Handle Resume Values Safely
Always validate and provide defaults for resume values:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import interrupt
class SafeState(TypedDict):
status: str
def safe_review_node(state: SafeState) -> dict:
"""Safely handle resume values."""
response = interrupt({"action": "review"})
# Validate response type
if not isinstance(response, dict):
response = {"action": "approve"} # Default to approve
# Get action with default
action = response.get("action", "approve")
return {"status": action}
3. Keep Interrupts Deterministic
Avoid conditional interrupts - keep the order consistent:Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import interrupt
class DeterministicState(TypedDict):
requires_review: bool
content: str
approved: bool
# ❌ Bad - conditional interrupt
def bad_review(state: DeterministicState) -> dict:
if state["requires_review"]:
interrupt({"content": state["content"]}) # Inconsistent!
return {"approved": True}
# ✅ Good - always interrupt, handle decision in resume
def good_review(state: DeterministicState) -> dict:
response = interrupt({
"content": state["content"],
"requires_review": state["requires_review"]
})
# Let the caller decide whether to skip review
if response.get("skip"):
return {"approved": True}
return {"approved": response.get("approved", False)}
Next Steps
- Building Agents - Create AI agents with tools
- Advanced Features - Explore Send API and parallel execution
- Reliability - Add retry and cache policies

