Skip to main content

Overview

Human-in-the-Loop (HITL) patterns let you pause execution for human review, approval, or input. This is essential for:
  • Content Moderation - Review AI-generated content before publishing
  • 🎯 Decision Approval - Require human sign-off on critical actions
  • ✏️ Content Editing - Let humans refine AI outputs
  • 🔍 Quality Control - Inspect intermediate results
  • 🤝 Collaborative Workflows - Mix human and AI decision-making

The Interrupt Primitive

The interrupt() function pauses execution and returns control to the caller:
from upsonic.graphv2 import interrupt

def review_node(state: MyState) -> dict:
    # Pause and show data to human
    human_response = interrupt({
        "action": "review",
        "data": state["draft"],
        "options": ["approve", "edit", "reject"]
    })
    
    # Execution resumes here when graph.invoke() is called with Command(resume=...)
    return {"approved": human_response}

How Interrupts Work

  1. Node calls interrupt() - Execution pauses
  2. Graph returns with __interrupt__ key - Contains interrupt data
  3. Application shows data to human - Display UI, wait for input
  4. Resume with Command(resume=…) - Continue execution with human response

Basic Interrupt Example

from typing_extensions import TypedDict
from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command, interrupt

class ApprovalState(TypedDict):
    content: str
    approved: bool
    feedback: str

def generate_content(state: ApprovalState) -> dict:
    # Generate content (simulated)
    draft = "This is AI-generated content about " + state["content"]
    return {"content": draft}

def review_node(state: ApprovalState) -> dict:
    # Pause for human review
    response = interrupt({
        "action": "review_content",
        "content": state["content"],
        "instruction": "Please review and approve or provide feedback"
    })
    
    # When resumed, response contains the value from Command(resume=...)
    if response.get("action") == "approve":
        return {"approved": True, "feedback": ""}
    else:
        return {"approved": False, "feedback": response.get("feedback", "")}

# Build graph
builder = StateGraph(ApprovalState)
builder.add_node("generate", generate_content)
builder.add_node("review", review_node)

builder.add_edge(START, "generate")
builder.add_edge("generate", "review")
builder.add_edge("review", END)

# Must use checkpointer with interrupts
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Execute
config = {"configurable": {"thread_id": "approval-1"}}
result = graph.invoke(
    {"content": "Python programming", "approved": False, "feedback": ""},
    config=config
)

# Check if interrupted
if "__interrupt__" in result:
    print("⏸️  Execution paused for review")
    interrupt_data = result["__interrupt__"][0]["value"]
    print(f"Content to review: {interrupt_data['content']}")
    
    # Simulate human approval
    human_decision = {"action": "approve"}
    
    # Resume execution
    final_result = graph.invoke(
        Command(resume=human_decision),
        config=config
    )
    
    print(f"Approved: {final_result['approved']}")
Interrupts require checkpointers because execution state must be persisted between the interrupt and resume calls.

Interrupt Patterns

Pattern 1: Approve/Reject

Simple binary approval:
def approval_node(state: State) -> dict:
    approved = interrupt({
        "action": "approve_action",
        "data": state["action_details"],
        "question": "Approve this action?"
    })
    
    if approved:
        return {"status": "approved"}
    else:
        return {"status": "rejected"}

# Resume with boolean
graph.invoke(Command(resume=True), config=config)  # Approve
# or
graph.invoke(Command(resume=False), config=config)  # Reject

Pattern 2: Edit Content

Allow humans to modify AI outputs:
def edit_node(state: State) -> dict:
    edited_text = interrupt({
        "action": "edit_content",
        "original": state["draft"],
        "instruction": "Edit the content as needed"
    })
    
    return {"final_content": edited_text}

# Resume with edited text
graph.invoke(
    Command(resume="Human-edited content here"),
    config=config
)

Pattern 3: Multiple Options

Present choices to the user:
def routing_node(state: State) -> Command:
    choice = interrupt({
        "action": "choose_path",
        "options": ["path_a", "path_b", "path_c"],
        "question": "Which path should we take?"
    })
    
    return Command(
        update={"chosen_path": choice},
        goto=choice
    )

# Resume with choice
graph.invoke(Command(resume="path_a"), config=config)

Pattern 4: Iterative Refinement

Loop until human is satisfied:
from upsonic.graphv2 import Command, END

def generate_and_review(state: State) -> Command:
    # Generate content
    draft = generate_content(state)
    
    # Ask for feedback
    feedback = interrupt({
        "action": "review_and_refine",
        "draft": draft,
        "iteration": state.get("iteration", 1)
    })
    
    if feedback["action"] == "approve":
        return Command(
            update={"final": draft, "approved": True},
            goto=END
        )
    else:
        return Command(
            update={
                "feedback": feedback["comments"],
                "iteration": state.get("iteration", 1) + 1
            },
            goto="generate_and_review"  # Loop back
        )

# First iteration - reject
graph.invoke(initial_state, config=config)
graph.invoke(
    Command(resume={"action": "refine", "comments": "Make it shorter"}),
    config=config
)

# Second iteration - approve
graph.invoke(
    Command(resume={"action": "approve"}),
    config=config
)

Interrupt Configuration

Configure where interrupts can happen:

Interrupt Before

Pause before executing specific nodes:
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["critical_action", "dangerous_operation"]
)

# Execution pauses before these nodes
# Resume with empty Command to continue
graph.invoke(Command(resume=None), config=config)

Interrupt After

Pause after executing specific nodes:
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_after=["data_processing", "validation"]
)

# Execution pauses after these nodes
# Inspect results before continuing
state = graph.get_state(config)
print(f"Results so far: {state.values}")

# Continue
graph.invoke(Command(resume=None), config=config)
Use Cases:
  • interrupt_before - Approval gates before critical operations
  • interrupt_after - Quality checks after processing steps

Complete Example: Content Review Workflow

from typing import Annotated, List
from typing_extensions import TypedDict
import operator

from upsonic.graphv2 import StateGraph, START, END, MemorySaver, Command, interrupt
from upsonic.models import infer_model
from upsonic.messages import ModelRequest, UserPromptPart, SystemPromptPart

class ContentState(TypedDict):
    topic: str
    draft: str
    final_content: str
    approval_status: str
    revisions: Annotated[List[str], operator.add]

def generate_content(state: ContentState) -> dict:
    """Generate initial content draft."""
    model = infer_model("openai/gpt-4o-mini")
    
    prompt = f"Write a short article about: {state['topic']}"
    if state.get("revisions"):
        prompt += f"\nPrevious feedback: {state['revisions'][-1]}"
    
    request = ModelRequest(parts=[
        SystemPromptPart(content="You are a content writer."),
        UserPromptPart(content=prompt)
    ])
    
    draft = model.invoke([request])
    return {"draft": draft}

def review_node(state: ContentState) -> dict:
    """Pause for human review."""
    response = interrupt({
        "action": "review_content",
        "draft": state["draft"],
        "topic": state["topic"],
        "options": ["approve", "edit", "regenerate"],
        "revisions_so_far": len(state.get("revisions", []))
    })
    
    if response.get("action") == "approve":
        return {
            "final_content": state["draft"],
            "approval_status": "approved"
        }
    elif response.get("action") == "edit":
        return {
            "final_content": response.get("edited_text"),
            "approval_status": "edited"
        }
    else:  # regenerate
        return {
            "approval_status": "regenerate",
            "revisions": [response.get("feedback", "Please improve")]
        }

def finalize_node(state: ContentState) -> dict:
    """Finalize approved content."""
    return {"approval_status": "finalized"}

def should_regenerate(state: ContentState) -> str:
    """Check if content should be regenerated."""
    if state.get("approval_status") == "regenerate":
        return "generate_content"
    return "finalize"

# Build workflow
builder = StateGraph(ContentState)
builder.add_node("generate_content", generate_content)
builder.add_node("review", review_node)
builder.add_node("finalize", finalize_node)

builder.add_edge(START, "generate_content")
builder.add_edge("generate_content", "review")
builder.add_conditional_edges(
    "review",
    should_regenerate,
    ["generate_content", "finalize"]
)
builder.add_edge("finalize", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Scenario 1: Approve as-is
print("=== Scenario 1: Approve ===")
config1 = {"configurable": {"thread_id": "content-1"}}

result = graph.invoke({
    "topic": "Benefits of Python",
    "draft": "",
    "final_content": "",
    "approval_status": "",
    "revisions": []
}, config=config1)

if "__interrupt__" in result:
    print("⏸️  Paused for review")
    interrupt_data = result["__interrupt__"][0]["value"]
    print(f"Draft: {interrupt_data['draft'][:100]}...")
    
    # Approve
    final = graph.invoke(
        Command(resume={"action": "approve"}),
        config=config1
    )
    print(f"✓ Status: {final['approval_status']}")

# Scenario 2: Request regeneration
print("\n=== Scenario 2: Regenerate ===")
config2 = {"configurable": {"thread_id": "content-2"}}

result = graph.invoke({
    "topic": "Machine Learning",
    "draft": "",
    "final_content": "",
    "approval_status": "",
    "revisions": []
}, config=config2)

if "__interrupt__" in result:
    print("⏸️  Paused for review")
    
    # Request regeneration
    result2 = graph.invoke(
        Command(resume={
            "action": "regenerate",
            "feedback": "Make it more technical"
        }),
        config=config2
    )
    
    # Will hit another interrupt after regeneration
    if "__interrupt__" in result2:
        print("⏸️  New draft ready")
        interrupt_data = result2["__interrupt__"][0]["value"]
        print(f"Revisions made: {interrupt_data['revisions_so_far']}")
        
        # Approve the new version
        final = graph.invoke(
            Command(resume={"action": "approve"}),
            config=config2
        )
        print(f"✓ Status: {final['approval_status']}")

# Scenario 3: Edit manually
print("\n=== Scenario 3: Edit ===")
config3 = {"configurable": {"thread_id": "content-3"}}

result = graph.invoke({
    "topic": "JavaScript Frameworks",
    "draft": "",
    "final_content": "",
    "approval_status": "",
    "revisions": []
}, config=config3)

if "__interrupt__" in result:
    print("⏸️  Paused for review")
    interrupt_data = result["__interrupt__"][0]["value"]
    
    # Manually edit
    edited = interrupt_data["draft"] + " [Human-added conclusion]"
    
    final = graph.invoke(
        Command(resume={"action": "edit", "edited_text": edited}),
        config=config3
    )
    print(f"✓ Status: {final['approval_status']}")
    print(f"✓ Final: {final['final_content'][-50:]}...")

Best Practices

1. Provide Clear Context

Give humans all the information they need:
# ✅ Good - rich context
interrupt({
    "action": "review",
    "content": state["draft"],
    "metadata": {
        "author": "AI Assistant",
        "topic": state["topic"],
        "word_count": len(state["draft"].split()),
        "created_at": datetime.now().isoformat()
    },
    "instructions": "Review for accuracy and tone"
})

# ❌ Bad - minimal context
interrupt(state["draft"])

2. Handle Resume Values Safely

Always validate human input:
def review_node(state: State) -> dict:
    response = interrupt({"action": "review"})
    
    # ✅ Good - validate response
    if not isinstance(response, dict):
        response = {"action": "approve"}  # Default
    
    action = response.get("action", "approve")
    
    if action not in ["approve", "edit", "reject"]:
        action = "approve"  # Safe default
    
    return {"status": action}

3. Track Interrupt History

Store information about reviews:
class State(TypedDict):
    content: str
    reviews: Annotated[List[dict], operator.add]

def review_node(state: State) -> dict:
    response = interrupt({
        "action": "review",
        "content": state["content"]
    })
    
    # Record the review
    return {
        "reviews": [{
            "timestamp": datetime.now().isoformat(),
            "action": response.get("action"),
            "reviewer": response.get("reviewer_id")
        }]
    }

4. Set Timeouts for Critical Workflows

import time

# Track when interrupt started
interrupt_time = time.time()

result = graph.invoke(state, config=config)

if "__interrupt__" in result:
    # Wait for human (with timeout)
    # ... show UI and wait ...
    
    # After some time, auto-approve if no response
    if time.time() - interrupt_time > 3600:  # 1 hour
        graph.invoke(
            Command(resume={"action": "auto_approved"}),
            config=config
        )

5. Graceful Degradation

Provide defaults when humans aren’t available:
def review_with_fallback(state: State) -> dict:
    if state.get("auto_approve_mode"):
        # Skip interrupt in auto mode
        return {"approved": True}
    
    # Normal interrupt flow
    response = interrupt({"action": "review"})
    return {"approved": response.get("approved", False)}

UI Integration Examples

Web Application

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route("/execute", methods=["POST"])
def execute_workflow():
    data = request.json
    thread_id = data["thread_id"]
    
    config = {"configurable": {"thread_id": thread_id}}
    result = graph.invoke(data["state"], config=config)
    
    if "__interrupt__" in result:
        # Return interrupt data to frontend
        return jsonify({
            "status": "interrupted",
            "interrupt": result["__interrupt__"][0]["value"],
            "thread_id": thread_id
        })
    
    return jsonify({"status": "completed", "result": result})

@app.route("/resume", methods=["POST"])
def resume_workflow():
    data = request.json
    thread_id = data["thread_id"]
    human_input = data["input"]
    
    config = {"configurable": {"thread_id": thread_id}}
    result = graph.invoke(Command(resume=human_input), config=config)
    
    return jsonify({"status": "completed", "result": result})

CLI Application

def run_with_human_review():
    config = {"configurable": {"thread_id": "cli-session"}}
    
    result = graph.invoke(initial_state, config=config)
    
    while "__interrupt__" in result:
        interrupt_data = result["__interrupt__"][0]["value"]
        
        # Show to user
        print("\n" + "="*50)
        print("⏸️  HUMAN REVIEW REQUIRED")
        print("="*50)
        print(f"Content: {interrupt_data['content']}")
        print("\nOptions:")
        for i, option in enumerate(interrupt_data.get("options", []), 1):
            print(f"  {i}. {option}")
        
        # Get input
        choice = input("\nYour choice: ").strip()
        
        # Resume
        result = graph.invoke(
            Command(resume={"action": choice}),
            config=config
        )
    
    print(f"\n✓ Completed: {result}")

Next Steps