Introduction
This capstone section brings together everything we've learned about LangGraph to build a complete multi-agent research and analysis system. We'll combine supervisor patterns, conditional routing, human-in-the-loop, and persistence to create a production-ready solution.
Section Overview: We'll build a multi-agent system with specialized agents (researcher, analyst, writer), a supervisor for orchestration, persistent state, and human review capabilities.
System Architecture
Our system consists of three specialized agents coordinated by a supervisor:
🐍python
1"""
2Multi-Agent Research System Architecture
3
4 ┌─────────────────┐
5 │ Supervisor │
6 │ (Orchestrator)│
7 └────────┬────────┘
8 │
9 ┌───────────────────┼───────────────────┐
10 │ │ │
11 ▼ ▼ ▼
12┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
13│ Researcher │ │ Analyst │ │ Writer │
14│ (Web Search) │ │ (Data Analysis) │ │ (Report Gen) │
15└─────────────────┘ └─────────────────┘ └─────────────────┘
16 │ │ │
17 └───────────────────┼───────────────────┘
18 │
19 ▼
20 ┌─────────────────┐
21 │ Human Review │
22 │ (Approval) │
23 └─────────────────┘
24"""
25
26from typing import TypedDict, Annotated, Literal, Optional, List
27from langgraph.graph import StateGraph, START, END
28from langgraph.checkpoint.postgres import PostgresSaver
29from langchain_openai import ChatOpenAI
30from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
31from langchain_core.tools import tool
32from psycopg_pool import ConnectionPool
33import operator
34import json
35from datetime import datetime
36
37
38# Agent identifiers
39AGENTS = Literal["researcher", "analyst", "writer", "supervisor"]
40ACTIONS = Literal["research", "analyze", "write", "review", "complete"]System Requirements
| Component | Purpose | Technology |
|---|---|---|
| Supervisor | Task orchestration and routing | LLM with structured output |
| Researcher | Web search and data gathering | Search tools + LLM |
| Analyst | Data analysis and insights | Python execution + LLM |
| Writer | Report generation | LLM with templates |
| Persistence | State checkpointing | PostgreSQL |
| Human Review | Quality control | Interrupt patterns |
Agent Definitions
State Schema
🐍python
1class ResearchState(TypedDict):
2 """Comprehensive state for the research system."""
3
4 # Task definition
5 query: str
6 objectives: List[str]
7
8 # Agent outputs
9 research_data: Annotated[List[dict], operator.add]
10 analysis_results: Annotated[List[dict], operator.add]
11 draft_sections: Annotated[List[dict], operator.add]
12
13 # Coordination
14 current_agent: str
15 next_action: str
16 iteration: int
17 max_iterations: int
18
19 # Human-in-the-loop
20 requires_review: bool
21 review_feedback: Optional[str]
22 is_approved: bool
23
24 # Messages for communication
25 messages: Annotated[List[BaseMessage], operator.add]
26
27 # Metadata
28 created_at: str
29 updated_at: str
30 status: str
31
32
33def create_initial_state(query: str, objectives: List[str]) -> ResearchState:
34 """Create initial state for a research task."""
35 now = datetime.now().isoformat()
36 return {
37 "query": query,
38 "objectives": objectives,
39 "research_data": [],
40 "analysis_results": [],
41 "draft_sections": [],
42 "current_agent": "supervisor",
43 "next_action": "research",
44 "iteration": 0,
45 "max_iterations": 10,
46 "requires_review": False,
47 "review_feedback": None,
48 "is_approved": False,
49 "messages": [HumanMessage(content=f"Research task: {query}")],
50 "created_at": now,
51 "updated_at": now,
52 "status": "started"
53 }Researcher Agent
🐍python
1from langchain_community.tools import DuckDuckGoSearchRun
2
3# Initialize tools
4search_tool = DuckDuckGoSearchRun()
5llm = ChatOpenAI(model="gpt-4o", temperature=0)
6
7
8@tool
9def web_search(query: str) -> str:
10 """Search the web for information."""
11 return search_tool.run(query)
12
13
14@tool
15def extract_key_facts(text: str, topic: str) -> str:
16 """Extract key facts from text about a topic."""
17 messages = [
18 SystemMessage(content="Extract key facts and data points from the text."),
19 HumanMessage(content=f"Topic: {topic}\n\nText:\n{text}")
20 ]
21 response = llm.invoke(messages)
22 return response.content
23
24
25researcher_tools = [web_search, extract_key_facts]
26researcher_llm = llm.bind_tools(researcher_tools)
27
28
29def researcher_agent(state: ResearchState) -> dict:
30 """Researcher agent: gathers information from web sources."""
31
32 query = state["query"]
33 objectives = state["objectives"]
34 existing_data = state["research_data"]
35
36 # Determine what still needs to be researched
37 messages = [
38 SystemMessage(content="""You are a research agent. Your job is to:
391. Search for relevant information about the query
402. Extract key facts and data
413. Organize findings by objective
42
43Use the web_search tool to find information, then extract_key_facts to process it.
44Be thorough but efficient - focus on the most relevant sources."""),
45 HumanMessage(content=f"""
46Query: {query}
47
48Objectives to research:
49{json.dumps(objectives, indent=2)}
50
51Already gathered data:
52{json.dumps(existing_data, indent=2)}
53
54Find additional information to fulfill the objectives.
55""")
56 ]
57
58 response = researcher_llm.invoke(messages)
59
60 # Process tool calls
61 research_findings = []
62 if hasattr(response, "tool_calls") and response.tool_calls:
63 for tool_call in response.tool_calls:
64 tool_name = tool_call["name"]
65 tool_args = tool_call["args"]
66
67 if tool_name == "web_search":
68 result = web_search.invoke(tool_args)
69 elif tool_name == "extract_key_facts":
70 result = extract_key_facts.invoke(tool_args)
71 else:
72 result = "Unknown tool"
73
74 research_findings.append({
75 "tool": tool_name,
76 "query": tool_args,
77 "result": result,
78 "timestamp": datetime.now().isoformat()
79 })
80
81 return {
82 "research_data": research_findings,
83 "current_agent": "researcher",
84 "messages": [AIMessage(content=f"Researcher gathered {len(research_findings)} findings")],
85 "updated_at": datetime.now().isoformat()
86 }Analyst Agent
🐍python
1from pydantic import BaseModel, Field
2
3
4class AnalysisResult(BaseModel):
5 """Structured analysis output."""
6 category: str = Field(description="Category of the analysis")
7 findings: List[str] = Field(description="Key findings")
8 confidence: float = Field(description="Confidence score 0-1")
9 data_quality: str = Field(description="Assessment of data quality")
10 recommendations: List[str] = Field(description="Actionable recommendations")
11
12
13analyst_llm = llm.with_structured_output(AnalysisResult)
14
15
16def analyst_agent(state: ResearchState) -> dict:
17 """Analyst agent: analyzes research data and generates insights."""
18
19 query = state["query"]
20 objectives = state["objectives"]
21 research_data = state["research_data"]
22
23 # Skip if no research data
24 if not research_data:
25 return {
26 "analysis_results": [{
27 "error": "No research data available",
28 "timestamp": datetime.now().isoformat()
29 }],
30 "current_agent": "analyst",
31 "messages": [AIMessage(content="Analyst: No data to analyze")],
32 "updated_at": datetime.now().isoformat()
33 }
34
35 # Analyze each objective
36 analysis_results = []
37
38 for i, objective in enumerate(objectives):
39 messages = [
40 SystemMessage(content="""You are a data analyst. Analyze the research data
41to extract insights related to the given objective. Provide structured analysis
42with findings, confidence scores, and recommendations."""),
43 HumanMessage(content=f"""
44Research Query: {query}
45
46Objective: {objective}
47
48Research Data:
49{json.dumps(research_data, indent=2)}
50
51Analyze this data for insights related to the objective.
52""")
53 ]
54
55 try:
56 analysis = analyst_llm.invoke(messages)
57 analysis_results.append({
58 "objective": objective,
59 "analysis": analysis.model_dump(),
60 "timestamp": datetime.now().isoformat()
61 })
62 except Exception as e:
63 analysis_results.append({
64 "objective": objective,
65 "error": str(e),
66 "timestamp": datetime.now().isoformat()
67 })
68
69 return {
70 "analysis_results": analysis_results,
71 "current_agent": "analyst",
72 "messages": [AIMessage(content=f"Analyst completed {len(analysis_results)} analyses")],
73 "updated_at": datetime.now().isoformat()
74 }Writer Agent
🐍python
1class ReportSection(BaseModel):
2 """Structured report section."""
3 title: str = Field(description="Section title")
4 content: str = Field(description="Section content in markdown")
5 key_points: List[str] = Field(description="Key takeaways")
6 sources: List[str] = Field(description="Referenced sources")
7
8
9writer_llm = llm.with_structured_output(ReportSection)
10
11
12def writer_agent(state: ResearchState) -> dict:
13 """Writer agent: generates report sections from analysis."""
14
15 query = state["query"]
16 objectives = state["objectives"]
17 analysis_results = state["analysis_results"]
18 existing_sections = state["draft_sections"]
19
20 # Skip if no analysis
21 if not analysis_results:
22 return {
23 "draft_sections": [{
24 "error": "No analysis available",
25 "timestamp": datetime.now().isoformat()
26 }],
27 "current_agent": "writer",
28 "messages": [AIMessage(content="Writer: No analysis to write about")],
29 "updated_at": datetime.now().isoformat()
30 }
31
32 # Generate sections for each analysis
33 new_sections = []
34
35 for analysis in analysis_results:
36 if "error" in analysis:
37 continue
38
39 messages = [
40 SystemMessage(content="""You are a technical writer. Create a clear,
41well-structured report section based on the analysis. Use markdown formatting.
42Focus on clarity and actionable insights."""),
43 HumanMessage(content=f"""
44Research Query: {query}
45
46Analysis to write about:
47{json.dumps(analysis, indent=2)}
48
49Existing sections (avoid repetition):
50{json.dumps([s.get('title') for s in existing_sections], indent=2)}
51
52Create a report section for this analysis.
53""")
54 ]
55
56 try:
57 section = writer_llm.invoke(messages)
58 new_sections.append({
59 "section": section.model_dump(),
60 "objective": analysis.get("objective"),
61 "timestamp": datetime.now().isoformat()
62 })
63 except Exception as e:
64 new_sections.append({
65 "objective": analysis.get("objective"),
66 "error": str(e),
67 "timestamp": datetime.now().isoformat()
68 })
69
70 return {
71 "draft_sections": new_sections,
72 "current_agent": "writer",
73 "requires_review": True, # Trigger human review after writing
74 "messages": [AIMessage(content=f"Writer created {len(new_sections)} sections")],
75 "updated_at": datetime.now().isoformat()
76 }Supervisor Implementation
🐍python
1class SupervisorDecision(BaseModel):
2 """Structured supervisor decision."""
3 next_agent: Literal["researcher", "analyst", "writer", "human_review", "complete"]
4 reasoning: str = Field(description="Explanation of the decision")
5 instructions: str = Field(description="Specific instructions for the next agent")
6
7
8supervisor_llm = llm.with_structured_output(SupervisorDecision)
9
10
11def supervisor_agent(state: ResearchState) -> dict:
12 """Supervisor: orchestrates the multi-agent workflow."""
13
14 # Check iteration limit
15 if state["iteration"] >= state["max_iterations"]:
16 return {
17 "next_action": "complete",
18 "current_agent": "supervisor",
19 "messages": [AIMessage(content="Supervisor: Max iterations reached, completing")],
20 "status": "max_iterations_reached",
21 "updated_at": datetime.now().isoformat()
22 }
23
24 # Check if approved
25 if state["is_approved"]:
26 return {
27 "next_action": "complete",
28 "current_agent": "supervisor",
29 "messages": [AIMessage(content="Supervisor: Report approved, completing")],
30 "status": "approved",
31 "updated_at": datetime.now().isoformat()
32 }
33
34 # Analyze current state to decide next action
35 messages = [
36 SystemMessage(content="""You are a supervisor coordinating a research team.
37
38Your agents:
39- researcher: Gathers information from web sources
40- analyst: Analyzes data and extracts insights
41- writer: Creates report sections
42
43Workflow:
441. Start with researcher to gather data
452. Move to analyst when enough data is collected
463. Move to writer when analysis is complete
474. Send to human_review when draft is ready
485. Complete when approved or objectives are met
49
50Make strategic decisions based on current progress."""),
51 HumanMessage(content=f"""
52Query: {state['query']}
53
54Objectives: {json.dumps(state['objectives'])}
55
56Current Progress:
57- Research data items: {len(state['research_data'])}
58- Analysis results: {len(state['analysis_results'])}
59- Draft sections: {len(state['draft_sections'])}
60- Iteration: {state['iteration']} / {state['max_iterations']}
61- Requires review: {state['requires_review']}
62- Review feedback: {state['review_feedback']}
63
64Recent messages:
65{[m.content for m in state['messages'][-5:]]}
66
67Decide the next step.
68""")
69 ]
70
71 decision = supervisor_llm.invoke(messages)
72
73 return {
74 "next_action": decision.next_agent,
75 "current_agent": "supervisor",
76 "iteration": state["iteration"] + 1,
77 "messages": [AIMessage(content=f"Supervisor: {decision.reasoning}")],
78 "updated_at": datetime.now().isoformat()
79 }
80
81
82def route_to_agent(state: ResearchState) -> str:
83 """Route to the appropriate agent based on supervisor decision."""
84 next_action = state.get("next_action", "research")
85
86 route_map = {
87 "researcher": "researcher",
88 "research": "researcher",
89 "analyst": "analyst",
90 "analyze": "analyst",
91 "writer": "writer",
92 "write": "writer",
93 "human_review": "human_review",
94 "review": "human_review",
95 "complete": "finalize"
96 }
97
98 return route_map.get(next_action, "researcher")State Management
Human Review Node
🐍python
1def human_review_node(state: ResearchState) -> dict:
2 """Human review checkpoint - workflow pauses here."""
3
4 # Compile draft for review
5 draft_content = []
6 for section in state["draft_sections"]:
7 if "section" in section:
8 s = section["section"]
9 draft_content.append(f"## {s['title']}\n\n{s['content']}")
10
11 full_draft = "\n\n".join(draft_content)
12
13 return {
14 "messages": [AIMessage(content=f"""
15=== DRAFT READY FOR REVIEW ===
16
17{full_draft}
18
19=== END DRAFT ===
20
21Please review and provide feedback, or approve to complete.
22""")],
23 "requires_review": True,
24 "status": "pending_review",
25 "updated_at": datetime.now().isoformat()
26 }
27
28
29def check_review_status(state: ResearchState) -> Literal["approved", "revise", "wait"]:
30 """Check if human has provided review feedback."""
31 if state.get("is_approved"):
32 return "approved"
33 elif state.get("review_feedback"):
34 return "revise"
35 else:
36 return "wait"
37
38
39def finalize_node(state: ResearchState) -> dict:
40 """Finalize the report and clean up."""
41
42 # Compile final report
43 final_sections = []
44 for section in state["draft_sections"]:
45 if "section" in section:
46 final_sections.append(section["section"])
47
48 final_report = {
49 "title": f"Research Report: {state['query']}",
50 "created_at": state["created_at"],
51 "completed_at": datetime.now().isoformat(),
52 "objectives": state["objectives"],
53 "sections": final_sections,
54 "data_sources": len(state["research_data"]),
55 "analyses_performed": len(state["analysis_results"]),
56 "iterations": state["iteration"]
57 }
58
59 return {
60 "messages": [AIMessage(content=f"Report finalized with {len(final_sections)} sections")],
61 "status": "completed",
62 "updated_at": datetime.now().isoformat()
63 }Complete System
🐍python
1def build_research_system(checkpointer=None):
2 """Build the complete multi-agent research system."""
3
4 graph = StateGraph(ResearchState)
5
6 # Add all agent nodes
7 graph.add_node("supervisor", supervisor_agent)
8 graph.add_node("researcher", researcher_agent)
9 graph.add_node("analyst", analyst_agent)
10 graph.add_node("writer", writer_agent)
11 graph.add_node("human_review", human_review_node)
12 graph.add_node("finalize", finalize_node)
13
14 # Entry point
15 graph.add_edge(START, "supervisor")
16
17 # Supervisor routes to agents
18 graph.add_conditional_edges(
19 "supervisor",
20 route_to_agent,
21 {
22 "researcher": "researcher",
23 "analyst": "analyst",
24 "writer": "writer",
25 "human_review": "human_review",
26 "finalize": "finalize"
27 }
28 )
29
30 # All agents return to supervisor
31 graph.add_edge("researcher", "supervisor")
32 graph.add_edge("analyst", "supervisor")
33 graph.add_edge("writer", "supervisor")
34
35 # Human review routing
36 graph.add_conditional_edges(
37 "human_review",
38 check_review_status,
39 {
40 "approved": "finalize",
41 "revise": "supervisor", # Back to supervisor for revision
42 "wait": END # Pause for human input
43 }
44 )
45
46 # Finalize ends the workflow
47 graph.add_edge("finalize", END)
48
49 # Compile with optional persistence
50 if checkpointer:
51 return graph.compile(
52 checkpointer=checkpointer,
53 interrupt_before=["human_review"] # Pause for human review
54 )
55 else:
56 from langgraph.checkpoint.memory import MemorySaver
57 return graph.compile(
58 checkpointer=MemorySaver(),
59 interrupt_before=["human_review"]
60 )
61
62
63# Production setup with PostgreSQL
64def create_production_system():
65 """Create production-ready system with PostgreSQL persistence."""
66
67 pool = ConnectionPool(
68 conninfo="postgresql://user:pass@localhost:5432/research_db",
69 min_size=5,
70 max_size=20
71 )
72
73 checkpointer = PostgresSaver(pool)
74 checkpointer.setup()
75
76 return build_research_system(checkpointer)Running the System
🐍python
1class ResearchSystemRunner:
2 """Runner class for the multi-agent research system."""
3
4 def __init__(self, use_postgres: bool = False):
5 if use_postgres:
6 self.app = create_production_system()
7 else:
8 self.app = build_research_system()
9
10 def start_research(
11 self,
12 query: str,
13 objectives: List[str],
14 thread_id: str
15 ) -> dict:
16 """Start a new research task."""
17
18 initial_state = create_initial_state(query, objectives)
19 config = {"configurable": {"thread_id": thread_id}}
20
21 # Run until human review or completion
22 result = self.app.invoke(initial_state, config)
23
24 return {
25 "thread_id": thread_id,
26 "status": result.get("status"),
27 "requires_review": result.get("requires_review"),
28 "iterations": result.get("iteration")
29 }
30
31 def get_status(self, thread_id: str) -> dict:
32 """Get current status of a research task."""
33
34 config = {"configurable": {"thread_id": thread_id}}
35 state = self.app.get_state(config)
36
37 return {
38 "thread_id": thread_id,
39 "status": state.values.get("status"),
40 "current_agent": state.values.get("current_agent"),
41 "iteration": state.values.get("iteration"),
42 "research_items": len(state.values.get("research_data", [])),
43 "analysis_items": len(state.values.get("analysis_results", [])),
44 "draft_sections": len(state.values.get("draft_sections", [])),
45 "requires_review": state.values.get("requires_review"),
46 "next_nodes": state.next
47 }
48
49 def get_draft(self, thread_id: str) -> str:
50 """Get the current draft for review."""
51
52 config = {"configurable": {"thread_id": thread_id}}
53 state = self.app.get_state(config)
54
55 sections = state.values.get("draft_sections", [])
56 draft_content = []
57
58 for section in sections:
59 if "section" in section:
60 s = section["section"]
61 draft_content.append(f"## {s['title']}\n\n{s['content']}")
62
63 return "\n\n".join(draft_content)
64
65 def submit_review(
66 self,
67 thread_id: str,
68 approved: bool,
69 feedback: Optional[str] = None
70 ) -> dict:
71 """Submit human review and continue workflow."""
72
73 config = {"configurable": {"thread_id": thread_id}}
74
75 # Update state with review
76 self.app.update_state(config, {
77 "is_approved": approved,
78 "review_feedback": feedback,
79 "requires_review": False
80 })
81
82 # Resume workflow
83 result = self.app.invoke(None, config)
84
85 return {
86 "thread_id": thread_id,
87 "status": result.get("status"),
88 "completed": result.get("status") == "completed"
89 }
90
91 def get_final_report(self, thread_id: str) -> dict:
92 """Get the final report after completion."""
93
94 config = {"configurable": {"thread_id": thread_id}}
95 state = self.app.get_state(config)
96
97 if state.values.get("status") != "completed":
98 return {"error": "Report not yet completed"}
99
100 return {
101 "query": state.values.get("query"),
102 "objectives": state.values.get("objectives"),
103 "sections": [
104 s["section"] for s in state.values.get("draft_sections", [])
105 if "section" in s
106 ],
107 "iterations": state.values.get("iteration"),
108 "created_at": state.values.get("created_at"),
109 "completed_at": state.values.get("updated_at")
110 }Example Usage
🐍python
1def run_research_example():
2 """Example of running the research system."""
3
4 runner = ResearchSystemRunner(use_postgres=False)
5
6 # Start research
7 result = runner.start_research(
8 query="Impact of AI on software development productivity",
9 objectives=[
10 "Quantify productivity gains from AI coding assistants",
11 "Identify key challenges and limitations",
12 "Recommend best practices for adoption"
13 ],
14 thread_id="research-001"
15 )
16
17 print(f"Started: {result}")
18
19 # Check status
20 status = runner.get_status("research-001")
21 print(f"Status: {status}")
22
23 # If pending review, get draft
24 if status["requires_review"]:
25 draft = runner.get_draft("research-001")
26 print(f"\nDraft for review:\n{draft}")
27
28 # Simulate human review
29 review_result = runner.submit_review(
30 thread_id="research-001",
31 approved=True,
32 feedback=None
33 )
34 print(f"\nReview submitted: {review_result}")
35
36 # Get final report
37 report = runner.get_final_report("research-001")
38 print(f"\nFinal report: {json.dumps(report, indent=2)}")
39
40
41if __name__ == "__main__":
42 run_research_example()Testing and Debugging
🐍python
1import pytest
2from unittest.mock import Mock, patch
3
4
5class TestResearchSystem:
6 """Test suite for the multi-agent research system."""
7
8 def test_initial_state_creation(self):
9 """Test initial state is created correctly."""
10 state = create_initial_state(
11 query="Test query",
12 objectives=["Obj 1", "Obj 2"]
13 )
14
15 assert state["query"] == "Test query"
16 assert len(state["objectives"]) == 2
17 assert state["iteration"] == 0
18 assert state["status"] == "started"
19
20 def test_supervisor_routing(self):
21 """Test supervisor makes correct routing decisions."""
22 # State with no research data
23 state = create_initial_state("Test", ["Obj"])
24 state["research_data"] = []
25
26 result = supervisor_agent(state)
27
28 assert result["next_action"] in ["researcher", "research"]
29
30 def test_iteration_limit(self):
31 """Test that iteration limit is respected."""
32 state = create_initial_state("Test", ["Obj"])
33 state["iteration"] = 10
34 state["max_iterations"] = 10
35
36 result = supervisor_agent(state)
37
38 assert result["next_action"] == "complete"
39 assert result["status"] == "max_iterations_reached"
40
41 @patch("langchain_openai.ChatOpenAI")
42 def test_researcher_with_no_tools(self, mock_llm):
43 """Test researcher handles no tool calls gracefully."""
44 mock_response = Mock()
45 mock_response.tool_calls = []
46 mock_llm.return_value.invoke.return_value = mock_response
47
48 state = create_initial_state("Test", ["Obj"])
49 result = researcher_agent(state)
50
51 assert result["current_agent"] == "researcher"
52 assert isinstance(result["research_data"], list)
53
54
55def debug_workflow(thread_id: str, runner: ResearchSystemRunner):
56 """Debug utility for inspecting workflow state."""
57
58 config = {"configurable": {"thread_id": thread_id}}
59
60 # Get full state
61 state = runner.app.get_state(config)
62 print("\n=== Current State ===")
63 print(f"Status: {state.values.get('status')}")
64 print(f"Current agent: {state.values.get('current_agent')}")
65 print(f"Iteration: {state.values.get('iteration')}")
66 print(f"Next nodes: {state.next}")
67
68 # Get history
69 history = list(runner.app.get_state_history(config))
70 print(f"\n=== History ({len(history)} checkpoints) ===")
71 for i, snapshot in enumerate(history[:5]): # Last 5
72 print(f" {i}: Agent={snapshot.values.get('current_agent')}, "
73 f"Status={snapshot.values.get('status')}")
74
75 # Analyze messages
76 messages = state.values.get("messages", [])
77 print(f"\n=== Recent Messages ({len(messages)} total) ===")
78 for msg in messages[-5:]:
79 print(f" - {type(msg).__name__}: {msg.content[:100]}...")
80
81 return state
82
83
84def visualize_graph():
85 """Visualize the multi-agent graph structure."""
86 from IPython.display import Image, display
87
88 app = build_research_system()
89
90 try:
91 img = app.get_graph().draw_mermaid_png()
92 display(Image(img))
93 except Exception:
94 # Fallback to text representation
95 print(app.get_graph().draw_ascii())Key Takeaways
- Multi-agent systems require clear state schemas that capture all information needed by different agents.
- Supervisor orchestration uses LLM reasoning to make intelligent routing decisions based on current progress.
- Specialized agents (researcher, analyst, writer) have focused responsibilities and tools.
- Human-in-the-loop is essential for quality control in production systems.
- Persistence with checkpointing enables recovery, debugging, and long-running workflows.
- Iteration limits prevent infinite loops and control costs.
- Testing and debugging tools are crucial for understanding system behavior.
Chapter Complete: You've now mastered LangGraph from fundamentals to building production-ready multi-agent systems. The patterns learned here form the foundation for building sophisticated AI applications.