Introduction
Multi-agent systems can be organized using various architectural patterns, each with distinct characteristics suited to different use cases. Understanding these patterns is essential for designing effective multi-agent systems. This section provides an overview of the three major patterns we'll explore in detail throughout this chapter.
Section Overview: We'll examine supervisor, peer-to-peer, and hierarchical patterns, understanding their core concepts before diving deep into implementation details in subsequent sections.
Pattern Categories
Multi-agent architecture patterns can be categorized by how agents are organized and how they communicate:
Organization Dimension
1from enum import Enum
2from dataclasses import dataclass
3from typing import Protocol
4
5
6class OrganizationType(Enum):
7 """How agents are organized."""
8 CENTRALIZED = "centralized" # Single point of control
9 DECENTRALIZED = "decentralized" # No single controller
10 HIERARCHICAL = "hierarchical" # Layered control structure
11
12
13class CommunicationType(Enum):
14 """How agents communicate."""
15 DIRECT = "direct" # Agent-to-agent
16 BROADCAST = "broadcast" # One-to-many
17 MEDIATED = "mediated" # Through intermediary
18
19
20@dataclass
21class ArchitecturePattern:
22 """Describes a multi-agent architecture pattern."""
23 name: str
24 organization: OrganizationType
25 communication: CommunicationType
26 description: str
27
28 def characteristics(self) -> dict:
29 """Return key characteristics of this pattern."""
30 return {
31 "organization": self.organization.value,
32 "communication": self.communication.value,
33 "description": self.description
34 }
35
36
37# Define the three main patterns
38SUPERVISOR_PATTERN = ArchitecturePattern(
39 name="Supervisor",
40 organization=OrganizationType.CENTRALIZED,
41 communication=CommunicationType.MEDIATED,
42 description="Central coordinator directs worker agents"
43)
44
45PEER_TO_PEER_PATTERN = ArchitecturePattern(
46 name="Peer-to-Peer",
47 organization=OrganizationType.DECENTRALIZED,
48 communication=CommunicationType.DIRECT,
49 description="Agents collaborate as equals without central control"
50)
51
52HIERARCHICAL_PATTERN = ArchitecturePattern(
53 name="Hierarchical",
54 organization=OrganizationType.HIERARCHICAL,
55 communication=CommunicationType.MEDIATED,
56 description="Agents organized in layers with delegation"
57)| Pattern | Organization | Communication | Complexity |
|---|---|---|---|
| Supervisor | Centralized | Mediated | Low-Medium |
| Peer-to-Peer | Decentralized | Direct | Medium-High |
| Hierarchical | Layered | Mediated | High |
Supervisor Pattern
In the supervisor pattern, a central agent acts as coordinator, delegating tasks to worker agents and aggregating their results.
Core Concept
1from dataclasses import dataclass, field
2from typing import Any, Callable, Awaitable
3from abc import ABC, abstractmethod
4
5
6@dataclass
7class WorkerAgent(ABC):
8 """Base class for worker agents."""
9 name: str
10 capabilities: list[str] = field(default_factory=list)
11
12 @abstractmethod
13 async def execute(self, task: dict) -> dict:
14 """Execute assigned task."""
15 pass
16
17
18@dataclass
19class CodeWriterWorker(WorkerAgent):
20 """Worker specialized in writing code."""
21 name: str = "code_writer"
22 capabilities: list[str] = field(
23 default_factory=lambda: ["write_code", "refactor"]
24 )
25
26 async def execute(self, task: dict) -> dict:
27 task_type = task.get("type", "")
28 if task_type == "write_code":
29 return {"code": "# Generated code", "language": "python"}
30 return {"error": "Unknown task type"}
31
32
33@dataclass
34class TestWriterWorker(WorkerAgent):
35 """Worker specialized in writing tests."""
36 name: str = "test_writer"
37 capabilities: list[str] = field(
38 default_factory=lambda: ["write_tests", "test_analysis"]
39 )
40
41 async def execute(self, task: dict) -> dict:
42 task_type = task.get("type", "")
43 if task_type == "write_tests":
44 return {"tests": "# Generated tests", "coverage": 0.85}
45 return {"error": "Unknown task type"}
46
47
48@dataclass
49class SupervisorAgent:
50 """Central coordinator that manages worker agents."""
51
52 workers: dict[str, WorkerAgent] = field(default_factory=dict)
53 task_history: list[dict] = field(default_factory=list)
54
55 def register_worker(self, worker: WorkerAgent):
56 """Register a worker agent."""
57 self.workers[worker.name] = worker
58
59 async def process_request(self, request: str) -> dict:
60 """
61 Supervisor workflow:
62 1. Analyze request
63 2. Plan subtasks
64 3. Assign to appropriate workers
65 4. Collect and aggregate results
66 """
67
68 # Step 1: Analyze request and plan
69 plan = await self.create_plan(request)
70
71 # Step 2: Execute plan
72 results = {}
73 for step in plan["steps"]:
74 worker_name = step["worker"]
75 task = step["task"]
76
77 if worker_name in self.workers:
78 result = await self.workers[worker_name].execute(task)
79 results[step["name"]] = result
80
81 # Step 3: Aggregate results
82 final_result = await self.aggregate_results(results)
83
84 return final_result
85
86 async def create_plan(self, request: str) -> dict:
87 """Analyze request and create execution plan."""
88 # In practice, this would use LLM to create plan
89 return {
90 "steps": [
91 {"name": "code", "worker": "code_writer", "task": {"type": "write_code"}},
92 {"name": "tests", "worker": "test_writer", "task": {"type": "write_tests"}}
93 ]
94 }
95
96 async def aggregate_results(self, results: dict) -> dict:
97 """Combine results from all workers."""
98 return {
99 "status": "complete",
100 "results": results
101 }
102
103
104# Usage
105async def supervisor_demo():
106 supervisor = SupervisorAgent()
107 supervisor.register_worker(CodeWriterWorker())
108 supervisor.register_worker(TestWriterWorker())
109
110 result = await supervisor.process_request(
111 "Create a function to calculate fibonacci numbers with tests"
112 )
113 print(result)Key Characteristics
- Single point of coordination - All decisions flow through the supervisor
- Clear delegation - Workers have well-defined responsibilities
- Centralized state - Supervisor maintains overall task state
- Simple communication - Workers report only to supervisor
Peer-to-Peer Pattern
In peer-to-peer systems, agents collaborate as equals without a central coordinator. They communicate directly and reach consensus through negotiation.
Core Concept
1from dataclasses import dataclass, field
2from typing import Optional
3from abc import ABC, abstractmethod
4import asyncio
5
6
7@dataclass
8class Message:
9 """Message passed between peer agents."""
10 sender: str
11 recipient: str
12 content: dict
13 message_type: str # "request", "response", "broadcast"
14
15
16@dataclass
17class PeerAgent(ABC):
18 """Agent that participates in peer-to-peer collaboration."""
19
20 name: str
21 peers: dict[str, 'PeerAgent'] = field(default_factory=dict)
22 inbox: list[Message] = field(default_factory=list)
23
24 def register_peer(self, peer: 'PeerAgent'):
25 """Register another agent as a peer."""
26 self.peers[peer.name] = peer
27
28 async def send_message(self, recipient: str, content: dict, msg_type: str):
29 """Send message to a peer."""
30 if recipient in self.peers:
31 message = Message(
32 sender=self.name,
33 recipient=recipient,
34 content=content,
35 message_type=msg_type
36 )
37 self.peers[recipient].inbox.append(message)
38
39 async def broadcast(self, content: dict):
40 """Send message to all peers."""
41 for peer_name in self.peers:
42 await self.send_message(peer_name, content, "broadcast")
43
44 async def process_inbox(self):
45 """Process all received messages."""
46 while self.inbox:
47 message = self.inbox.pop(0)
48 await self.handle_message(message)
49
50 @abstractmethod
51 async def handle_message(self, message: Message):
52 """Handle a received message."""
53 pass
54
55 @abstractmethod
56 async def contribute(self, task: dict) -> dict:
57 """Contribute to collaborative task."""
58 pass
59
60
61@dataclass
62class DesignPeer(PeerAgent):
63 """Peer agent focused on design."""
64 name: str = "designer"
65
66 async def handle_message(self, message: Message):
67 if message.content.get("type") == "review_request":
68 # Provide design feedback
69 feedback = await self.review_design(message.content.get("artifact"))
70 await self.send_message(
71 message.sender,
72 {"feedback": feedback, "type": "design_feedback"},
73 "response"
74 )
75
76 async def contribute(self, task: dict) -> dict:
77 return {"design": "System architecture proposal"}
78
79 async def review_design(self, artifact: dict) -> str:
80 return "Design looks good with minor suggestions"
81
82
83@dataclass
84class ImplementationPeer(PeerAgent):
85 """Peer agent focused on implementation."""
86 name: str = "implementer"
87
88 async def handle_message(self, message: Message):
89 if message.content.get("type") == "implementation_request":
90 code = await self.implement(message.content.get("spec"))
91 await self.send_message(
92 message.sender,
93 {"code": code, "type": "implementation"},
94 "response"
95 )
96
97 async def contribute(self, task: dict) -> dict:
98 return {"code": "Implementation based on design"}
99
100 async def implement(self, spec: dict) -> str:
101 return "# Implementation code"
102
103
104@dataclass
105class PeerNetwork:
106 """Manages peer-to-peer agent collaboration."""
107
108 peers: list[PeerAgent] = field(default_factory=list)
109
110 def add_peer(self, peer: PeerAgent):
111 """Add peer to network and connect to others."""
112 # Connect new peer to all existing peers
113 for existing in self.peers:
114 peer.register_peer(existing)
115 existing.register_peer(peer)
116 self.peers.append(peer)
117
118 async def collaborative_task(self, task: dict) -> dict:
119 """Execute task through peer collaboration."""
120
121 # All peers contribute
122 contributions = await asyncio.gather(*[
123 peer.contribute(task) for peer in self.peers
124 ])
125
126 # Process messages (reviews, feedback)
127 for peer in self.peers:
128 await peer.process_inbox()
129
130 # Combine contributions
131 return {
132 "contributions": dict(zip(
133 [p.name for p in self.peers],
134 contributions
135 ))
136 }
137
138
139# Usage
140async def peer_demo():
141 network = PeerNetwork()
142 network.add_peer(DesignPeer())
143 network.add_peer(ImplementationPeer())
144
145 result = await network.collaborative_task(
146 {"description": "Build a user authentication system"}
147 )
148 print(result)Key Characteristics
- No central authority - Agents are equals in the system
- Direct communication - Agents message each other directly
- Consensus-based - Decisions emerge from negotiation
- Fault tolerant - No single point of failure
Hierarchical Pattern
Hierarchical systems organize agents in layers, where higher-level agents manage and delegate to lower-level agents. This pattern combines aspects of supervisor and peer-to-peer approaches.
Core Concept
1from dataclasses import dataclass, field
2from typing import Optional
3from abc import ABC, abstractmethod
4
5
6@dataclass
7class HierarchicalAgent(ABC):
8 """Agent in a hierarchical structure."""
9
10 name: str
11 level: int # 0 = top, higher = lower in hierarchy
12 manager: Optional['HierarchicalAgent'] = None
13 subordinates: list['HierarchicalAgent'] = field(default_factory=list)
14
15 def add_subordinate(self, agent: 'HierarchicalAgent'):
16 """Add a subordinate agent."""
17 agent.manager = self
18 self.subordinates.append(agent)
19
20 async def delegate(self, task: dict) -> dict:
21 """Delegate task to appropriate subordinate."""
22 if not self.subordinates:
23 # Leaf node - execute directly
24 return await self.execute(task)
25
26 # Find suitable subordinate
27 suitable = self.select_subordinate(task)
28 if suitable:
29 return await suitable.delegate(task)
30
31 # No suitable subordinate - execute self
32 return await self.execute(task)
33
34 @abstractmethod
35 def select_subordinate(self, task: dict) -> Optional['HierarchicalAgent']:
36 """Select which subordinate should handle the task."""
37 pass
38
39 @abstractmethod
40 async def execute(self, task: dict) -> dict:
41 """Execute task directly."""
42 pass
43
44 async def escalate(self, issue: dict) -> dict:
45 """Escalate issue to manager."""
46 if self.manager:
47 return await self.manager.handle_escalation(issue, self)
48 return {"error": "No manager to escalate to"}
49
50 async def handle_escalation(
51 self,
52 issue: dict,
53 from_agent: 'HierarchicalAgent'
54 ) -> dict:
55 """Handle escalation from subordinate."""
56 # Default: try to handle or escalate further
57 return await self.escalate(issue)
58
59
60@dataclass
61class ProjectManagerAgent(HierarchicalAgent):
62 """Top-level project manager."""
63 name: str = "project_manager"
64 level: int = 0
65
66 def select_subordinate(self, task: dict) -> Optional[HierarchicalAgent]:
67 task_type = task.get("type", "")
68 for sub in self.subordinates:
69 if task_type in getattr(sub, 'capabilities', []):
70 return sub
71 return self.subordinates[0] if self.subordinates else None
72
73 async def execute(self, task: dict) -> dict:
74 return {"plan": "High-level project plan"}
75
76
77@dataclass
78class TeamLeadAgent(HierarchicalAgent):
79 """Mid-level team lead."""
80 name: str = "team_lead"
81 level: int = 1
82 capabilities: list[str] = field(default_factory=list)
83
84 def select_subordinate(self, task: dict) -> Optional[HierarchicalAgent]:
85 # Distribute among team members
86 if self.subordinates:
87 # Round-robin or skill-based selection
88 return self.subordinates[0]
89 return None
90
91 async def execute(self, task: dict) -> dict:
92 return {"implementation": "Team-level implementation"}
93
94
95@dataclass
96class DeveloperAgent(HierarchicalAgent):
97 """Leaf-level developer."""
98 name: str = "developer"
99 level: int = 2
100 specialty: str = "general"
101
102 def select_subordinate(self, task: dict) -> Optional[HierarchicalAgent]:
103 return None # Leaf node
104
105 async def execute(self, task: dict) -> dict:
106 return {"code": f"Code from [self.specialty] developer"}
107
108
109def build_hierarchy() -> ProjectManagerAgent:
110 """Build a hierarchical agent structure."""
111 pm = ProjectManagerAgent()
112
113 # Backend team
114 backend_lead = TeamLeadAgent(
115 name="backend_lead",
116 capabilities=["backend", "api", "database"]
117 )
118 backend_lead.add_subordinate(
119 DeveloperAgent(name="backend_dev_1", specialty="api")
120 )
121 backend_lead.add_subordinate(
122 DeveloperAgent(name="backend_dev_2", specialty="database")
123 )
124 pm.add_subordinate(backend_lead)
125
126 # Frontend team
127 frontend_lead = TeamLeadAgent(
128 name="frontend_lead",
129 capabilities=["frontend", "ui", "react"]
130 )
131 frontend_lead.add_subordinate(
132 DeveloperAgent(name="frontend_dev_1", specialty="react")
133 )
134 pm.add_subordinate(frontend_lead)
135
136 return pm
137
138
139# Usage
140async def hierarchy_demo():
141 pm = build_hierarchy()
142
143 # Task flows down through hierarchy
144 backend_task = {"type": "api", "description": "Create REST endpoint"}
145 result = await pm.delegate(backend_task)
146 print(result)Key Characteristics
- Layered organization - Clear levels of abstraction and responsibility
- Delegation chains - Tasks flow down, escalations flow up
- Scoped authority - Each level has defined decision boundaries
- Scalable - Easy to add more agents at any level
Pattern Comparison
Each pattern has distinct strengths and trade-offs:
| Aspect | Supervisor | Peer-to-Peer | Hierarchical |
|---|---|---|---|
| Coordination | Centralized | Distributed | Layered |
| Scalability | Limited by supervisor | Horizontally scalable | Vertically scalable |
| Fault tolerance | Low (single point) | High (no center) | Medium (redundant layers) |
| Complexity | Simple | Complex (consensus) | Medium |
| Latency | Higher (all via supervisor) | Lower (direct) | Medium (layer hops) |
| Best for | Defined workflows | Creative collaboration | Large organizations |
Pattern Selection Guide
1from dataclasses import dataclass
2from enum import Enum
3
4
5class PatternType(Enum):
6 SUPERVISOR = "supervisor"
7 PEER_TO_PEER = "peer_to_peer"
8 HIERARCHICAL = "hierarchical"
9
10
11@dataclass
12class SystemRequirements:
13 """Requirements for pattern selection."""
14 agent_count: int
15 need_central_control: bool
16 need_fault_tolerance: bool
17 task_independence: float # 0-1, how independent are subtasks
18 team_structure: str # "flat", "layered", "matrix"
19 latency_critical: bool
20
21
22def select_pattern(req: SystemRequirements) -> PatternType:
23 """Select appropriate pattern based on requirements."""
24
25 # Small teams with clear workflows
26 if req.agent_count <= 5 and req.need_central_control:
27 return PatternType.SUPERVISOR
28
29 # Highly independent tasks needing fault tolerance
30 if req.task_independence > 0.7 and req.need_fault_tolerance:
31 return PatternType.PEER_TO_PEER
32
33 # Large teams with organizational structure
34 if req.agent_count > 10 or req.team_structure == "layered":
35 return PatternType.HIERARCHICAL
36
37 # Default to supervisor for simpler management
38 return PatternType.SUPERVISOR
39
40
41# Example selections
42examples = [
43 SystemRequirements(
44 agent_count=3,
45 need_central_control=True,
46 need_fault_tolerance=False,
47 task_independence=0.3,
48 team_structure="flat",
49 latency_critical=False
50 ), # -> SUPERVISOR
51
52 SystemRequirements(
53 agent_count=6,
54 need_central_control=False,
55 need_fault_tolerance=True,
56 task_independence=0.8,
57 team_structure="flat",
58 latency_critical=True
59 ), # -> PEER_TO_PEER
60
61 SystemRequirements(
62 agent_count=15,
63 need_central_control=True,
64 need_fault_tolerance=True,
65 task_independence=0.5,
66 team_structure="layered",
67 latency_critical=False
68 ), # -> HIERARCHICAL
69]
70
71for req in examples:
72 pattern = select_pattern(req)
73 print(f"Agents: [req.agent_count], Pattern: [pattern.value]")Key Takeaways
- Supervisor pattern provides centralized control through a coordinator agent that delegates to workers.
- Peer-to-peer pattern enables decentralized collaboration where agents communicate directly as equals.
- Hierarchical pattern organizes agents in layers with delegation flowing down and escalation flowing up.
- Pattern selection depends on team size, control requirements, fault tolerance needs, and task characteristics.
- Hybrid approaches can combine patterns - e.g., supervisor with peer collaboration among workers.
Next Section Preview: We'll dive deep into the supervisor pattern, exploring implementation details, state management, and advanced coordination strategies.