Overview
Human-in-the-Loop (HITL) patterns let you pause execution for human review, approval, or input. This is essential for:- ✅ Content Moderation - Review AI-generated content before publishing
- 🎯 Decision Approval - Require human sign-off on critical actions
- ✏️ Content Editing - Let humans refine AI outputs
- 🔍 Quality Control - Inspect intermediate results
- 🤝 Collaborative Workflows - Mix human and AI decision-making
The Interrupt Primitive
Theinterrupt() function pauses execution and returns control to the caller:
Copy
from upsonic.graphv2 import interrupt
def review_node(state: MyState) -> dict:
# Pause and show data to human
human_response = interrupt({
"action": "review",
"data": state["draft"],
"options": ["approve", "edit", "reject"]
})
# Execution resumes here when graph.invoke() is called with Command(resume=...)
return {"approved": human_response}
How Interrupts Work
- Node calls interrupt() - Execution pauses
- Graph returns with
__interrupt__key - Contains interrupt data - Application shows data to human - Display UI, wait for input
- Resume with Command(resume=…) - Continue execution with human response
Basic Interrupt Example
Copy
from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command, interrupt
class ApprovalState(TypedDict):
content: str
approved: bool
feedback: str
def generate_content(state: ApprovalState) -> dict:
# Generate content (simulated)
draft = "This is AI-generated content about " + state["content"]
return {"content": draft}
def review_node(state: ApprovalState) -> dict:
# Pause for human review
response = interrupt({
"action": "review_content",
"content": state["content"],
"instruction": "Please review and approve or provide feedback"
})
# When resumed, response contains the value from Command(resume=...)
if response.get("action") == "approve":
return {"approved": True, "feedback": ""}
else:
return {"approved": False, "feedback": response.get("feedback", "")}
# Build graph
builder = StateGraph(ApprovalState)
builder.add_node("generate", generate_content)
builder.add_node("review", review_node)
builder.add_edge(START, "generate")
builder.add_edge("generate", "review")
builder.add_edge("review", END)
# Must use checkpointer with interrupts
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Execute
config = {"configurable": {"thread_id": "approval-1"}}
result = graph.invoke(
{"content": "Python programming", "approved": False, "feedback": ""},
config=config
)
# Check if interrupted
if "__interrupt__" in result:
print("⏸️ Execution paused for review")
interrupt_data = result["__interrupt__"][0]["value"]
print(f"Content to review: {interrupt_data['content']}")
# Simulate human approval
human_decision = {"action": "approve"}
# Resume execution
final_result = graph.invoke(
Command(resume=human_decision),
config=config
)
print(f"Approved: {final_result['approved']}")
Interrupts require checkpointers because execution state must be persisted between the interrupt and resume calls.
Interrupt Patterns
Pattern 1: Approve/Reject
Simple binary approval:Copy
def approval_node(state: State) -> dict:
approved = interrupt({
"action": "approve_action",
"data": state["action_details"],
"question": "Approve this action?"
})
if approved:
return {"status": "approved"}
else:
return {"status": "rejected"}
# Resume with boolean
graph.invoke(Command(resume=True), config=config) # Approve
# or
graph.invoke(Command(resume=False), config=config) # Reject
Pattern 2: Edit Content
Allow humans to modify AI outputs:Copy
def edit_node(state: State) -> dict:
edited_text = interrupt({
"action": "edit_content",
"original": state["draft"],
"instruction": "Edit the content as needed"
})
return {"final_content": edited_text}
# Resume with edited text
graph.invoke(
Command(resume="Human-edited content here"),
config=config
)
Pattern 3: Multiple Options
Present choices to the user:Copy
def routing_node(state: State) -> Command:
choice = interrupt({
"action": "choose_path",
"options": ["path_a", "path_b", "path_c"],
"question": "Which path should we take?"
})
return Command(
update={"chosen_path": choice},
goto=choice
)
# Resume with choice
graph.invoke(Command(resume="path_a"), config=config)
Pattern 4: Iterative Refinement
Loop until human is satisfied:Copy
from upsonic.graphv2 import Command, END
def generate_and_review(state: State) -> Command:
# Generate content
draft = generate_content(state)
# Ask for feedback
feedback = interrupt({
"action": "review_and_refine",
"draft": draft,
"iteration": state.get("iteration", 1)
})
if feedback["action"] == "approve":
return Command(
update={"final": draft, "approved": True},
goto=END
)
else:
return Command(
update={
"feedback": feedback["comments"],
"iteration": state.get("iteration", 1) + 1
},
goto="generate_and_review" # Loop back
)
# First iteration - reject
graph.invoke(initial_state, config=config)
graph.invoke(
Command(resume={"action": "refine", "comments": "Make it shorter"}),
config=config
)
# Second iteration - approve
graph.invoke(
Command(resume={"action": "approve"}),
config=config
)
Interrupt Configuration
Configure where interrupts can happen:Interrupt Before
Pause before executing specific nodes:Copy
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["critical_action", "dangerous_operation"]
)
# Execution pauses before these nodes
# Resume with empty Command to continue
graph.invoke(Command(resume=None), config=config)
Interrupt After
Pause after executing specific nodes:Copy
graph = builder.compile(
checkpointer=checkpointer,
interrupt_after=["data_processing", "validation"]
)
# Execution pauses after these nodes
# Inspect results before continuing
state = graph.get_state(config)
print(f"Results so far: {state.values}")
# Continue
graph.invoke(Command(resume=None), config=config)
Use Cases:
interrupt_before- Approval gates before critical operationsinterrupt_after- Quality checks after processing steps
Complete Example: Content Review Workflow
Copy
from typing import Annotated, List
from typing_extensions import TypedDict
import operator
from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command, interrupt
from upsonic.models import infer_model
from upsonic.messages import ModelRequest, UserPromptPart, SystemPromptPart
class ContentState(TypedDict):
topic: str
draft: str
final_content: str
approval_status: str
revisions: Annotated[List[str], operator.add]
def generate_content(state: ContentState) -> dict:
"""Generate initial content draft."""
model = infer_model("openai/gpt-4o-mini")
prompt = f"Write a short article about: {state['topic']}"
if state.get("revisions"):
prompt += f"\nPrevious feedback: {state['revisions'][-1]}"
request = ModelRequest(parts=[
SystemPromptPart(content="You are a content writer."),
UserPromptPart(content=prompt)
])
draft = model.invoke([request])
return {"draft": draft}
def review_node(state: ContentState) -> dict:
"""Pause for human review."""
response = interrupt({
"action": "review_content",
"draft": state["draft"],
"topic": state["topic"],
"options": ["approve", "edit", "regenerate"],
"revisions_so_far": len(state.get("revisions", []))
})
if response.get("action") == "approve":
return {
"final_content": state["draft"],
"approval_status": "approved"
}
elif response.get("action") == "edit":
return {
"final_content": response.get("edited_text"),
"approval_status": "edited"
}
else: # regenerate
return {
"approval_status": "regenerate",
"revisions": [response.get("feedback", "Please improve")]
}
def finalize_node(state: ContentState) -> dict:
"""Finalize approved content."""
return {"approval_status": "finalized"}
def should_regenerate(state: ContentState) -> str:
"""Check if content should be regenerated."""
if state.get("approval_status") == "regenerate":
return "generate_content"
return "finalize"
# Build workflow
builder = StateGraph(ContentState)
builder.add_node("generate_content", generate_content)
builder.add_node("review", review_node)
builder.add_node("finalize", finalize_node)
builder.add_edge(START, "generate_content")
builder.add_edge("generate_content", "review")
builder.add_conditional_edges(
"review",
should_regenerate,
["generate_content", "finalize"]
)
builder.add_edge("finalize", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Scenario 1: Approve as-is
print("=== Scenario 1: Approve ===")
config1 = {"configurable": {"thread_id": "content-1"}}
result = graph.invoke({
"topic": "Benefits of Python",
"draft": "",
"final_content": "",
"approval_status": "",
"revisions": []
}, config=config1)
if "__interrupt__" in result:
print("⏸️ Paused for review")
interrupt_data = result["__interrupt__"][0]["value"]
print(f"Draft: {interrupt_data['draft'][:100]}...")
# Approve
final = graph.invoke(
Command(resume={"action": "approve"}),
config=config1
)
print(f"✓ Status: {final['approval_status']}")
# Scenario 2: Request regeneration
print("\n=== Scenario 2: Regenerate ===")
config2 = {"configurable": {"thread_id": "content-2"}}
result = graph.invoke({
"topic": "Machine Learning",
"draft": "",
"final_content": "",
"approval_status": "",
"revisions": []
}, config=config2)
if "__interrupt__" in result:
print("⏸️ Paused for review")
# Request regeneration
result2 = graph.invoke(
Command(resume={
"action": "regenerate",
"feedback": "Make it more technical"
}),
config=config2
)
# Will hit another interrupt after regeneration
if "__interrupt__" in result2:
print("⏸️ New draft ready")
interrupt_data = result2["__interrupt__"][0]["value"]
print(f"Revisions made: {interrupt_data['revisions_so_far']}")
# Approve the new version
final = graph.invoke(
Command(resume={"action": "approve"}),
config=config2
)
print(f"✓ Status: {final['approval_status']}")
# Scenario 3: Edit manually
print("\n=== Scenario 3: Edit ===")
config3 = {"configurable": {"thread_id": "content-3"}}
result = graph.invoke({
"topic": "JavaScript Frameworks",
"draft": "",
"final_content": "",
"approval_status": "",
"revisions": []
}, config=config3)
if "__interrupt__" in result:
print("⏸️ Paused for review")
interrupt_data = result["__interrupt__"][0]["value"]
# Manually edit
edited = interrupt_data["draft"] + " [Human-added conclusion]"
final = graph.invoke(
Command(resume={"action": "edit", "edited_text": edited}),
config=config3
)
print(f"✓ Status: {final['approval_status']}")
print(f"✓ Final: {final['final_content'][-50:]}...")
Best Practices
1. Provide Clear Context
Give humans all the information they need:Copy
# ✅ Good - rich context
interrupt({
"action": "review",
"content": state["draft"],
"metadata": {
"author": "AI Assistant",
"topic": state["topic"],
"word_count": len(state["draft"].split()),
"created_at": datetime.now().isoformat()
},
"instructions": "Review for accuracy and tone"
})
# ❌ Bad - minimal context
interrupt(state["draft"])
2. Handle Resume Values Safely
Always validate human input:Copy
def review_node(state: State) -> dict:
response = interrupt({"action": "review"})
# ✅ Good - validate response
if not isinstance(response, dict):
response = {"action": "approve"} # Default
action = response.get("action", "approve")
if action not in ["approve", "edit", "reject"]:
action = "approve" # Safe default
return {"status": action}
3. Track Interrupt History
Store information about reviews:Copy
class State(TypedDict):
content: str
reviews: Annotated[List[dict], operator.add]
def review_node(state: State) -> dict:
response = interrupt({
"action": "review",
"content": state["content"]
})
# Record the review
return {
"reviews": [{
"timestamp": datetime.now().isoformat(),
"action": response.get("action"),
"reviewer": response.get("reviewer_id")
}]
}
4. Set Timeouts for Critical Workflows
Copy
import time
# Track when interrupt started
interrupt_time = time.time()
result = graph.invoke(state, config=config)
if "__interrupt__" in result:
# Wait for human (with timeout)
# ... show UI and wait ...
# After some time, auto-approve if no response
if time.time() - interrupt_time > 3600: # 1 hour
graph.invoke(
Command(resume={"action": "auto_approved"}),
config=config
)
5. Graceful Degradation
Provide defaults when humans aren’t available:Copy
def review_with_fallback(state: State) -> dict:
if state.get("auto_approve_mode"):
# Skip interrupt in auto mode
return {"approved": True}
# Normal interrupt flow
response = interrupt({"action": "review"})
return {"approved": response.get("approved", False)}
UI Integration Examples
Web Application
Copy
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/execute", methods=["POST"])
def execute_workflow():
data = request.json
thread_id = data["thread_id"]
config = {"configurable": {"thread_id": thread_id}}
result = graph.invoke(data["state"], config=config)
if "__interrupt__" in result:
# Return interrupt data to frontend
return jsonify({
"status": "interrupted",
"interrupt": result["__interrupt__"][0]["value"],
"thread_id": thread_id
})
return jsonify({"status": "completed", "result": result})
@app.route("/resume", methods=["POST"])
def resume_workflow():
data = request.json
thread_id = data["thread_id"]
human_input = data["input"]
config = {"configurable": {"thread_id": thread_id}}
result = graph.invoke(Command(resume=human_input), config=config)
return jsonify({"status": "completed", "result": result})
CLI Application
Copy
def run_with_human_review():
config = {"configurable": {"thread_id": "cli-session"}}
result = graph.invoke(initial_state, config=config)
while "__interrupt__" in result:
interrupt_data = result["__interrupt__"][0]["value"]
# Show to user
print("\n" + "="*50)
print("⏸️ HUMAN REVIEW REQUIRED")
print("="*50)
print(f"Content: {interrupt_data['content']}")
print("\nOptions:")
for i, option in enumerate(interrupt_data.get("options", []), 1):
print(f" {i}. {option}")
# Get input
choice = input("\nYour choice: ").strip()
# Resume
result = graph.invoke(
Command(resume={"action": choice}),
config=config
)
print(f"\n✓ Completed: {result}")

