Chapter 15
18 min read
Section 95 of 175

Human-in-the-Loop Patterns

LangGraph Deep Dive

Introduction

Human-in-the-loop (HITL) patterns allow workflows to pause for human input, approval, or correction. This is essential for high-stakes operations, quality control, and maintaining human oversight over AI systems.

Section Overview: We'll explore interrupt mechanisms, approval workflows, interactive conversation patterns, and how to edit state during execution.

Interrupt Patterns

Basic Interrupts

🐍python
1from typing import TypedDict, Annotated, Literal
2from langgraph.graph import StateGraph, START, END
3from langgraph.checkpoint.memory import MemorySaver
4import operator
5
6
7class InterruptState(TypedDict):
8    """State with interrupt capability."""
9    task: str
10    draft: str
11    approved: bool
12    feedback: str
13    iteration: int
14
15
16def generate_draft(state: InterruptState) -> dict:
17    """Generate a draft for review."""
18    iteration = state.get("iteration", 0)
19    feedback = state.get("feedback", "")
20
21    if iteration == 0:
22        draft = f"Initial draft for: [state['task']]"
23    else:
24        draft = f"Revised draft (iteration [iteration]) incorporating: [feedback]"
25
26    return {
27        "draft": draft,
28        "iteration": iteration + 1
29    }
30
31
32def check_approval(state: InterruptState) -> Literal["approved", "revise", "interrupt"]:
33    """Check if draft is approved or needs revision."""
34    if state.get("approved"):
35        return "approved"
36    elif state.get("feedback"):
37        return "revise"
38    else:
39        return "interrupt"  # Pause for human input
40
41
42def build_interruptible_graph():
43    """Build a graph that can be interrupted for human input."""
44
45    graph = StateGraph(InterruptState)
46
47    graph.add_node("generate", generate_draft)
48    graph.add_node("finalize", lambda s: {"status": "complete"})
49
50    graph.add_edge(START, "generate")
51
52    graph.add_conditional_edges(
53        "generate",
54        check_approval,
55        {
56            "approved": "finalize",
57            "revise": "generate",
58            "interrupt": END  # Pause here
59        }
60    )
61
62    graph.add_edge("finalize", END)
63
64    # Compile with checkpointing for state persistence
65    memory = MemorySaver()
66    return graph.compile(checkpointer=memory, interrupt_before=["generate"])
67
68
69# Usage with interrupts
70def run_with_interrupt():
71    app = build_interruptible_graph()
72    config = {"configurable": {"thread_id": "1"}}
73
74    # Start execution
75    initial_state = {"task": "Write a blog post", "approved": False}
76
77    for event in app.stream(initial_state, config):
78        print("Event:", event)
79
80    # Graph is now paused at interrupt point
81    # Get current state
82    current_state = app.get_state(config)
83    print("Current state:", current_state.values)
84
85    # Human provides feedback
86    app.update_state(config, {
87        "feedback": "Make it more engaging",
88        "approved": False
89    })
90
91    # Resume execution
92    for event in app.stream(None, config):
93        print("Resumed event:", event)
94
95    # Human approves
96    app.update_state(config, {"approved": True})
97
98    # Resume to completion
99    for event in app.stream(None, config):
100        print("Final event:", event)

Interrupt Before vs After

🐍python
1def build_graph_with_interrupts():
2    """Demonstrate interrupt_before and interrupt_after."""
3
4    graph = StateGraph(InterruptState)
5
6    graph.add_node("step1", lambda s: {"step": 1})
7    graph.add_node("step2", lambda s: {"step": 2})
8    graph.add_node("step3", lambda s: {"step": 3})
9
10    graph.add_edge(START, "step1")
11    graph.add_edge("step1", "step2")
12    graph.add_edge("step2", "step3")
13    graph.add_edge("step3", END)
14
15    memory = MemorySaver()
16
17    # interrupt_before: Pause BEFORE executing the node
18    # Useful for approval before action
19    app_before = graph.compile(
20        checkpointer=memory,
21        interrupt_before=["step2"]
22    )
23
24    # interrupt_after: Pause AFTER executing the node
25    # Useful for reviewing results before continuing
26    app_after = graph.compile(
27        checkpointer=memory,
28        interrupt_after=["step2"]
29    )
30
31    return app_before, app_after

Approval Workflows

🐍python
1from enum import Enum
2from typing import Optional
3from datetime import datetime
4
5
6class ApprovalStatus(str, Enum):
7    PENDING = "pending"
8    APPROVED = "approved"
9    REJECTED = "rejected"
10    NEEDS_REVISION = "needs_revision"
11
12
13class ApprovalState(TypedDict):
14    """State for approval workflow."""
15    document: str
16    approval_status: str
17    approver: Optional[str]
18    approval_time: Optional[str]
19    rejection_reason: Optional[str]
20    revision_notes: Optional[str]
21    history: Annotated[list[dict], operator.add]
22
23
24def submit_for_approval(state: ApprovalState) -> dict:
25    """Submit document for approval."""
26    return {
27        "approval_status": ApprovalStatus.PENDING,
28        "history": [{
29            "action": "submitted",
30            "time": datetime.now().isoformat()
31        }]
32    }
33
34
35def route_by_approval(state: ApprovalState) -> str:
36    """Route based on approval status."""
37    status = state["approval_status"]
38
39    if status == ApprovalStatus.APPROVED:
40        return "publish"
41    elif status == ApprovalStatus.REJECTED:
42        return "archive"
43    elif status == ApprovalStatus.NEEDS_REVISION:
44        return "revise"
45    else:
46        return "wait_for_approval"
47
48
49def build_approval_workflow():
50    """Build an approval workflow graph."""
51
52    graph = StateGraph(ApprovalState)
53
54    graph.add_node("submit", submit_for_approval)
55    graph.add_node("revise", lambda s: {"document": f"Revised: [s['revision_notes']]"})
56    graph.add_node("publish", lambda s: {"history": [{"action": "published"}]})
57    graph.add_node("archive", lambda s: {"history": [{"action": "archived"}]})
58
59    graph.add_edge(START, "submit")
60
61    graph.add_conditional_edges(
62        "submit",
63        route_by_approval,
64        {
65            "publish": "publish",
66            "archive": "archive",
67            "revise": "revise",
68            "wait_for_approval": END  # Interrupt for human approval
69        }
70    )
71
72    graph.add_edge("revise", "submit")  # Re-submit after revision
73    graph.add_edge("publish", END)
74    graph.add_edge("archive", END)
75
76    memory = MemorySaver()
77    return graph.compile(checkpointer=memory)
78
79
80# Approval interaction
81def approval_interaction():
82    app = build_approval_workflow()
83    config = {"configurable": {"thread_id": "approval-1"}}
84
85    # Start workflow
86    result = app.invoke({
87        "document": "Important document content",
88        "approval_status": "",
89        "history": []
90    }, config)
91
92    print("Submitted, waiting for approval...")
93
94    # Human provides approval decision
95    app.update_state(config, {
96        "approval_status": ApprovalStatus.NEEDS_REVISION,
97        "revision_notes": "Please add more details",
98        "approver": "manager@example.com"
99    })
100
101    # Resume
102    result = app.invoke(None, config)
103
104    # Human approves revision
105    app.update_state(config, {
106        "approval_status": ApprovalStatus.APPROVED,
107        "approval_time": datetime.now().isoformat()
108    })
109
110    # Final resume
111    result = app.invoke(None, config)
112    print("Final result:", result)

Interactive Agents

Build agents that can ask clarifying questions and interact with users:

🐍python
1from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
2from langchain_openai import ChatOpenAI
3
4
5class ConversationState(TypedDict):
6    """State for interactive conversation."""
7    messages: Annotated[list, operator.add]
8    needs_clarification: bool
9    clarification_question: Optional[str]
10    task_complete: bool
11
12
13llm = ChatOpenAI(model="gpt-4o", temperature=0)
14
15
16def process_message(state: ConversationState) -> dict:
17    """Process user message and determine if clarification needed."""
18    messages = state["messages"]
19
20    system = SystemMessage(content="""You are a helpful assistant.
21If the user's request is unclear, ask ONE clarifying question.
22Format: "CLARIFY: [your question]"
23Otherwise, provide a helpful response.
24Format: "RESPONSE: [your answer]" """)
25
26    response = llm.invoke([system] + messages)
27    content = response.content
28
29    if content.startswith("CLARIFY:"):
30        return {
31            "needs_clarification": True,
32            "clarification_question": content[8:].strip(),
33            "messages": [AIMessage(content=content[8:].strip())]
34        }
35    else:
36        answer = content[9:].strip() if content.startswith("RESPONSE:") else content
37        return {
38            "needs_clarification": False,
39            "task_complete": True,
40            "messages": [AIMessage(content=answer)]
41        }
42
43
44def route_conversation(state: ConversationState) -> str:
45    """Route based on conversation state."""
46    if state.get("task_complete"):
47        return "complete"
48    elif state.get("needs_clarification"):
49        return "wait_for_user"
50    else:
51        return "process"
52
53
54def build_interactive_agent():
55    """Build an interactive agent with clarification."""
56
57    graph = StateGraph(ConversationState)
58
59    graph.add_node("process", process_message)
60
61    graph.add_edge(START, "process")
62
63    graph.add_conditional_edges(
64        "process",
65        route_conversation,
66        {
67            "complete": END,
68            "wait_for_user": END,  # Interrupt for user input
69            "process": "process"
70        }
71    )
72
73    memory = MemorySaver()
74    return graph.compile(checkpointer=memory)
75
76
77def interactive_session():
78    """Run an interactive session."""
79    app = build_interactive_agent()
80    config = {"configurable": {"thread_id": "chat-1"}}
81
82    # User starts conversation
83    result = app.invoke({
84        "messages": [HumanMessage(content="Help me with a project")],
85        "needs_clarification": False,
86        "task_complete": False
87    }, config)
88
89    if result.get("needs_clarification"):
90        print(f"Agent asks: [result['clarification_question']]")
91
92        # User provides clarification
93        app.update_state(config, {
94            "messages": [HumanMessage(content="A Python web application")],
95            "needs_clarification": False
96        })
97
98        # Continue conversation
99        result = app.invoke(None, config)
100
101    print("Final response:", result["messages"][-1].content)

State Editing

LangGraph allows direct manipulation of state during execution:

🐍python
1def demonstrate_state_editing():
2    """Show various state editing operations."""
3
4    app = build_approval_workflow()
5    config = {"configurable": {"thread_id": "edit-demo"}}
6
7    # Start workflow
8    app.invoke({"document": "Test", "approval_status": "", "history": []}, config)
9
10    # Get current state
11    state_snapshot = app.get_state(config)
12    print("Current values:", state_snapshot.values)
13    print("Next nodes:", state_snapshot.next)
14
15    # Update specific fields
16    app.update_state(
17        config,
18        {"approval_status": ApprovalStatus.APPROVED},
19        as_node="submit"  # Specify which node made the update
20    )
21
22    # Get state history
23    history = list(app.get_state_history(config))
24    for h in history:
25        print(f"State at [h.created_at]: [h.values]")
26
27    # Revert to previous state
28    if len(history) > 1:
29        previous_config = history[1].config
30        app.update_state(previous_config, {})  # Revert to this checkpoint
31
32
33# Advanced: Modifying messages in state
34def edit_messages_example():
35    """Edit message history in state."""
36
37    class MessageState(TypedDict):
38        messages: list
39
40    app = build_interactive_agent()
41    config = {"configurable": {"thread_id": "msg-edit"}}
42
43    # Run initial conversation
44    app.invoke({
45        "messages": [HumanMessage(content="Hello")],
46        "needs_clarification": False,
47        "task_complete": False
48    }, config)
49
50    # Get current messages
51    state = app.get_state(config)
52    current_messages = state.values["messages"]
53
54    # Edit a message (e.g., correct user input)
55    if current_messages:
56        corrected = current_messages.copy()
57        corrected[0] = HumanMessage(content="Hi, I need help with Python")
58
59        app.update_state(config, {"messages": corrected})
60
61    # Resume with corrected history
62    result = app.invoke(None, config)

Key Takeaways

  • Interrupts enable human oversight - Use interrupt_before for approval gates and interrupt_after for review points.
  • Checkpointing is essential - MemorySaver or database-backed checkpointers preserve state across interrupts.
  • State updates resume execution - Use update_state to provide human input and continue the workflow.
  • Approval workflows combine routing with interrupts for structured review processes.
  • Interactive agents can ask clarifying questions by routing to interrupt points.
Next Section Preview: We'll explore persistence and checkpointing in depth for building durable, recoverable workflows.