Introduction
Human-in-the-loop (HITL) patterns allow workflows to pause for human input, approval, or correction. This is essential for high-stakes operations, quality control, and maintaining human oversight over AI systems.
Section Overview: We'll explore interrupt mechanisms, approval workflows, interactive conversation patterns, and how to edit state during execution.
Interrupt Patterns
Basic Interrupts
🐍python
1from typing import TypedDict, Annotated, Literal
2from langgraph.graph import StateGraph, START, END
3from langgraph.checkpoint.memory import MemorySaver
4import operator
5
6
7class InterruptState(TypedDict):
8 """State with interrupt capability."""
9 task: str
10 draft: str
11 approved: bool
12 feedback: str
13 iteration: int
14
15
16def generate_draft(state: InterruptState) -> dict:
17 """Generate a draft for review."""
18 iteration = state.get("iteration", 0)
19 feedback = state.get("feedback", "")
20
21 if iteration == 0:
22 draft = f"Initial draft for: [state['task']]"
23 else:
24 draft = f"Revised draft (iteration [iteration]) incorporating: [feedback]"
25
26 return {
27 "draft": draft,
28 "iteration": iteration + 1
29 }
30
31
32def check_approval(state: InterruptState) -> Literal["approved", "revise", "interrupt"]:
33 """Check if draft is approved or needs revision."""
34 if state.get("approved"):
35 return "approved"
36 elif state.get("feedback"):
37 return "revise"
38 else:
39 return "interrupt" # Pause for human input
40
41
42def build_interruptible_graph():
43 """Build a graph that can be interrupted for human input."""
44
45 graph = StateGraph(InterruptState)
46
47 graph.add_node("generate", generate_draft)
48 graph.add_node("finalize", lambda s: {"status": "complete"})
49
50 graph.add_edge(START, "generate")
51
52 graph.add_conditional_edges(
53 "generate",
54 check_approval,
55 {
56 "approved": "finalize",
57 "revise": "generate",
58 "interrupt": END # Pause here
59 }
60 )
61
62 graph.add_edge("finalize", END)
63
64 # Compile with checkpointing for state persistence
65 memory = MemorySaver()
66 return graph.compile(checkpointer=memory, interrupt_before=["generate"])
67
68
69# Usage with interrupts
70def run_with_interrupt():
71 app = build_interruptible_graph()
72 config = {"configurable": {"thread_id": "1"}}
73
74 # Start execution
75 initial_state = {"task": "Write a blog post", "approved": False}
76
77 for event in app.stream(initial_state, config):
78 print("Event:", event)
79
80 # Graph is now paused at interrupt point
81 # Get current state
82 current_state = app.get_state(config)
83 print("Current state:", current_state.values)
84
85 # Human provides feedback
86 app.update_state(config, {
87 "feedback": "Make it more engaging",
88 "approved": False
89 })
90
91 # Resume execution
92 for event in app.stream(None, config):
93 print("Resumed event:", event)
94
95 # Human approves
96 app.update_state(config, {"approved": True})
97
98 # Resume to completion
99 for event in app.stream(None, config):
100 print("Final event:", event)Interrupt Before vs After
🐍python
1def build_graph_with_interrupts():
2 """Demonstrate interrupt_before and interrupt_after."""
3
4 graph = StateGraph(InterruptState)
5
6 graph.add_node("step1", lambda s: {"step": 1})
7 graph.add_node("step2", lambda s: {"step": 2})
8 graph.add_node("step3", lambda s: {"step": 3})
9
10 graph.add_edge(START, "step1")
11 graph.add_edge("step1", "step2")
12 graph.add_edge("step2", "step3")
13 graph.add_edge("step3", END)
14
15 memory = MemorySaver()
16
17 # interrupt_before: Pause BEFORE executing the node
18 # Useful for approval before action
19 app_before = graph.compile(
20 checkpointer=memory,
21 interrupt_before=["step2"]
22 )
23
24 # interrupt_after: Pause AFTER executing the node
25 # Useful for reviewing results before continuing
26 app_after = graph.compile(
27 checkpointer=memory,
28 interrupt_after=["step2"]
29 )
30
31 return app_before, app_afterApproval Workflows
🐍python
1from enum import Enum
2from typing import Optional
3from datetime import datetime
4
5
6class ApprovalStatus(str, Enum):
7 PENDING = "pending"
8 APPROVED = "approved"
9 REJECTED = "rejected"
10 NEEDS_REVISION = "needs_revision"
11
12
13class ApprovalState(TypedDict):
14 """State for approval workflow."""
15 document: str
16 approval_status: str
17 approver: Optional[str]
18 approval_time: Optional[str]
19 rejection_reason: Optional[str]
20 revision_notes: Optional[str]
21 history: Annotated[list[dict], operator.add]
22
23
24def submit_for_approval(state: ApprovalState) -> dict:
25 """Submit document for approval."""
26 return {
27 "approval_status": ApprovalStatus.PENDING,
28 "history": [{
29 "action": "submitted",
30 "time": datetime.now().isoformat()
31 }]
32 }
33
34
35def route_by_approval(state: ApprovalState) -> str:
36 """Route based on approval status."""
37 status = state["approval_status"]
38
39 if status == ApprovalStatus.APPROVED:
40 return "publish"
41 elif status == ApprovalStatus.REJECTED:
42 return "archive"
43 elif status == ApprovalStatus.NEEDS_REVISION:
44 return "revise"
45 else:
46 return "wait_for_approval"
47
48
49def build_approval_workflow():
50 """Build an approval workflow graph."""
51
52 graph = StateGraph(ApprovalState)
53
54 graph.add_node("submit", submit_for_approval)
55 graph.add_node("revise", lambda s: {"document": f"Revised: [s['revision_notes']]"})
56 graph.add_node("publish", lambda s: {"history": [{"action": "published"}]})
57 graph.add_node("archive", lambda s: {"history": [{"action": "archived"}]})
58
59 graph.add_edge(START, "submit")
60
61 graph.add_conditional_edges(
62 "submit",
63 route_by_approval,
64 {
65 "publish": "publish",
66 "archive": "archive",
67 "revise": "revise",
68 "wait_for_approval": END # Interrupt for human approval
69 }
70 )
71
72 graph.add_edge("revise", "submit") # Re-submit after revision
73 graph.add_edge("publish", END)
74 graph.add_edge("archive", END)
75
76 memory = MemorySaver()
77 return graph.compile(checkpointer=memory)
78
79
80# Approval interaction
81def approval_interaction():
82 app = build_approval_workflow()
83 config = {"configurable": {"thread_id": "approval-1"}}
84
85 # Start workflow
86 result = app.invoke({
87 "document": "Important document content",
88 "approval_status": "",
89 "history": []
90 }, config)
91
92 print("Submitted, waiting for approval...")
93
94 # Human provides approval decision
95 app.update_state(config, {
96 "approval_status": ApprovalStatus.NEEDS_REVISION,
97 "revision_notes": "Please add more details",
98 "approver": "manager@example.com"
99 })
100
101 # Resume
102 result = app.invoke(None, config)
103
104 # Human approves revision
105 app.update_state(config, {
106 "approval_status": ApprovalStatus.APPROVED,
107 "approval_time": datetime.now().isoformat()
108 })
109
110 # Final resume
111 result = app.invoke(None, config)
112 print("Final result:", result)Interactive Agents
Build agents that can ask clarifying questions and interact with users:
🐍python
1from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
2from langchain_openai import ChatOpenAI
3
4
5class ConversationState(TypedDict):
6 """State for interactive conversation."""
7 messages: Annotated[list, operator.add]
8 needs_clarification: bool
9 clarification_question: Optional[str]
10 task_complete: bool
11
12
13llm = ChatOpenAI(model="gpt-4o", temperature=0)
14
15
16def process_message(state: ConversationState) -> dict:
17 """Process user message and determine if clarification needed."""
18 messages = state["messages"]
19
20 system = SystemMessage(content="""You are a helpful assistant.
21If the user's request is unclear, ask ONE clarifying question.
22Format: "CLARIFY: [your question]"
23Otherwise, provide a helpful response.
24Format: "RESPONSE: [your answer]" """)
25
26 response = llm.invoke([system] + messages)
27 content = response.content
28
29 if content.startswith("CLARIFY:"):
30 return {
31 "needs_clarification": True,
32 "clarification_question": content[8:].strip(),
33 "messages": [AIMessage(content=content[8:].strip())]
34 }
35 else:
36 answer = content[9:].strip() if content.startswith("RESPONSE:") else content
37 return {
38 "needs_clarification": False,
39 "task_complete": True,
40 "messages": [AIMessage(content=answer)]
41 }
42
43
44def route_conversation(state: ConversationState) -> str:
45 """Route based on conversation state."""
46 if state.get("task_complete"):
47 return "complete"
48 elif state.get("needs_clarification"):
49 return "wait_for_user"
50 else:
51 return "process"
52
53
54def build_interactive_agent():
55 """Build an interactive agent with clarification."""
56
57 graph = StateGraph(ConversationState)
58
59 graph.add_node("process", process_message)
60
61 graph.add_edge(START, "process")
62
63 graph.add_conditional_edges(
64 "process",
65 route_conversation,
66 {
67 "complete": END,
68 "wait_for_user": END, # Interrupt for user input
69 "process": "process"
70 }
71 )
72
73 memory = MemorySaver()
74 return graph.compile(checkpointer=memory)
75
76
77def interactive_session():
78 """Run an interactive session."""
79 app = build_interactive_agent()
80 config = {"configurable": {"thread_id": "chat-1"}}
81
82 # User starts conversation
83 result = app.invoke({
84 "messages": [HumanMessage(content="Help me with a project")],
85 "needs_clarification": False,
86 "task_complete": False
87 }, config)
88
89 if result.get("needs_clarification"):
90 print(f"Agent asks: [result['clarification_question']]")
91
92 # User provides clarification
93 app.update_state(config, {
94 "messages": [HumanMessage(content="A Python web application")],
95 "needs_clarification": False
96 })
97
98 # Continue conversation
99 result = app.invoke(None, config)
100
101 print("Final response:", result["messages"][-1].content)State Editing
LangGraph allows direct manipulation of state during execution:
🐍python
1def demonstrate_state_editing():
2 """Show various state editing operations."""
3
4 app = build_approval_workflow()
5 config = {"configurable": {"thread_id": "edit-demo"}}
6
7 # Start workflow
8 app.invoke({"document": "Test", "approval_status": "", "history": []}, config)
9
10 # Get current state
11 state_snapshot = app.get_state(config)
12 print("Current values:", state_snapshot.values)
13 print("Next nodes:", state_snapshot.next)
14
15 # Update specific fields
16 app.update_state(
17 config,
18 {"approval_status": ApprovalStatus.APPROVED},
19 as_node="submit" # Specify which node made the update
20 )
21
22 # Get state history
23 history = list(app.get_state_history(config))
24 for h in history:
25 print(f"State at [h.created_at]: [h.values]")
26
27 # Revert to previous state
28 if len(history) > 1:
29 previous_config = history[1].config
30 app.update_state(previous_config, {}) # Revert to this checkpoint
31
32
33# Advanced: Modifying messages in state
34def edit_messages_example():
35 """Edit message history in state."""
36
37 class MessageState(TypedDict):
38 messages: list
39
40 app = build_interactive_agent()
41 config = {"configurable": {"thread_id": "msg-edit"}}
42
43 # Run initial conversation
44 app.invoke({
45 "messages": [HumanMessage(content="Hello")],
46 "needs_clarification": False,
47 "task_complete": False
48 }, config)
49
50 # Get current messages
51 state = app.get_state(config)
52 current_messages = state.values["messages"]
53
54 # Edit a message (e.g., correct user input)
55 if current_messages:
56 corrected = current_messages.copy()
57 corrected[0] = HumanMessage(content="Hi, I need help with Python")
58
59 app.update_state(config, {"messages": corrected})
60
61 # Resume with corrected history
62 result = app.invoke(None, config)Key Takeaways
- Interrupts enable human oversight - Use interrupt_before for approval gates and interrupt_after for review points.
- Checkpointing is essential - MemorySaver or database-backed checkpointers preserve state across interrupts.
- State updates resume execution - Use update_state to provide human input and continue the workflow.
- Approval workflows combine routing with interrupts for structured review processes.
- Interactive agents can ask clarifying questions by routing to interrupt points.
Next Section Preview: We'll explore persistence and checkpointing in depth for building durable, recoverable workflows.