Introduction
Hierarchical orchestration organizes agents in layers, combining the benefits of centralized coordination with the scalability of distributed execution. This pattern mirrors organizational structures found in large enterprises, where work flows down through management layers and issues escalate up.
Section Overview: We'll design hierarchical agent structures, implement delegation chains, and build escalation mechanisms for handling issues that require higher-level intervention.
Hierarchy Design
Effective hierarchies balance depth (layers) with breadth (agents per layer) based on task complexity and coordination needs:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable, Awaitable
3from abc import ABC, abstractmethod
4from enum import Enum
5import asyncio
6
7
8class AgentRole(Enum):
9 """Roles in the hierarchy."""
10 EXECUTIVE = "executive" # Top-level strategic decisions
11 MANAGER = "manager" # Mid-level coordination
12 SPECIALIST = "specialist" # Domain experts
13 WORKER = "worker" # Task execution
14
15
16@dataclass
17class HierarchyNode:
18 """Node in the agent hierarchy."""
19 agent_id: str
20 role: AgentRole
21 level: int # 0 = top
22 parent: Optional['HierarchyNode'] = None
23 children: list['HierarchyNode'] = field(default_factory=list)
24 capabilities: list[str] = field(default_factory=list)
25
26 def add_child(self, child: 'HierarchyNode'):
27 """Add a child node."""
28 child.parent = self
29 child.level = self.level + 1
30 self.children.append(child)
31
32 def get_path_to_root(self) -> list['HierarchyNode']:
33 """Get chain from this node to root."""
34 path = [self]
35 current = self
36 while current.parent:
37 path.append(current.parent)
38 current = current.parent
39 return path
40
41 def find_by_id(self, agent_id: str) -> Optional['HierarchyNode']:
42 """Find node by agent ID in subtree."""
43 if self.agent_id == agent_id:
44 return self
45 for child in self.children:
46 found = child.find_by_id(agent_id)
47 if found:
48 return found
49 return None
50
51
52@dataclass
53class HierarchyBuilder:
54 """Builder for constructing agent hierarchies."""
55
56 def build_development_team(self) -> HierarchyNode:
57 """Build a software development team hierarchy."""
58
59 # Executive level
60 cto = HierarchyNode(
61 agent_id="cto",
62 role=AgentRole.EXECUTIVE,
63 level=0,
64 capabilities=["strategy", "architecture", "resource_allocation"]
65 )
66
67 # Manager level
68 backend_lead = HierarchyNode(
69 agent_id="backend_lead",
70 role=AgentRole.MANAGER,
71 level=1,
72 capabilities=["backend", "api", "database", "coordination"]
73 )
74
75 frontend_lead = HierarchyNode(
76 agent_id="frontend_lead",
77 role=AgentRole.MANAGER,
78 level=1,
79 capabilities=["frontend", "ui", "ux", "coordination"]
80 )
81
82 # Specialist level
83 api_specialist = HierarchyNode(
84 agent_id="api_specialist",
85 role=AgentRole.SPECIALIST,
86 level=2,
87 capabilities=["api_design", "rest", "graphql"]
88 )
89
90 db_specialist = HierarchyNode(
91 agent_id="db_specialist",
92 role=AgentRole.SPECIALIST,
93 level=2,
94 capabilities=["database", "sql", "optimization"]
95 )
96
97 react_specialist = HierarchyNode(
98 agent_id="react_specialist",
99 role=AgentRole.SPECIALIST,
100 level=2,
101 capabilities=["react", "typescript", "components"]
102 )
103
104 # Worker level
105 backend_dev_1 = HierarchyNode(
106 agent_id="backend_dev_1",
107 role=AgentRole.WORKER,
108 level=3,
109 capabilities=["python", "coding"]
110 )
111
112 frontend_dev_1 = HierarchyNode(
113 agent_id="frontend_dev_1",
114 role=AgentRole.WORKER,
115 level=3,
116 capabilities=["javascript", "css", "coding"]
117 )
118
119 # Assemble hierarchy
120 cto.add_child(backend_lead)
121 cto.add_child(frontend_lead)
122
123 backend_lead.add_child(api_specialist)
124 backend_lead.add_child(db_specialist)
125
126 frontend_lead.add_child(react_specialist)
127
128 api_specialist.add_child(backend_dev_1)
129 react_specialist.add_child(frontend_dev_1)
130
131 return cto
132
133
134@dataclass
135class HierarchyMetrics:
136 """Metrics about the hierarchy structure."""
137
138 root: HierarchyNode
139
140 def depth(self) -> int:
141 """Maximum depth of hierarchy."""
142 def max_depth(node: HierarchyNode) -> int:
143 if not node.children:
144 return node.level
145 return max(max_depth(child) for child in node.children)
146 return max_depth(self.root) + 1
147
148 def breadth_at_level(self, level: int) -> int:
149 """Number of nodes at specific level."""
150 def count_at_level(node: HierarchyNode, target: int) -> int:
151 if node.level == target:
152 return 1
153 return sum(count_at_level(child, target) for child in node.children)
154 return count_at_level(self.root, level)
155
156 def total_nodes(self) -> int:
157 """Total nodes in hierarchy."""
158 def count(node: HierarchyNode) -> int:
159 return 1 + sum(count(child) for child in node.children)
160 return count(self.root)
161
162 def span_of_control(self) -> dict:
163 """Average direct reports per manager."""
164 managers = []
165 def collect_managers(node: HierarchyNode):
166 if node.children:
167 managers.append(len(node.children))
168 for child in node.children:
169 collect_managers(child)
170 collect_managers(self.root)
171
172 if not managers:
173 return {"avg": 0, "max": 0, "min": 0}
174 return {
175 "avg": sum(managers) / len(managers),
176 "max": max(managers),
177 "min": min(managers)
178 }| Role | Responsibilities | Typical Span |
|---|---|---|
| Executive | Strategy, resource allocation | 2-5 managers |
| Manager | Coordination, prioritization | 3-7 specialists |
| Specialist | Domain expertise, quality | 2-4 workers |
| Worker | Task execution | N/A (leaf nodes) |
Delegation Patterns
Delegation determines how tasks flow down through the hierarchy:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from abc import ABC, abstractmethod
4
5
6@dataclass
7class DelegatedTask:
8 """Task being delegated through hierarchy."""
9 id: str
10 type: str
11 payload: dict
12 origin: str # Agent that created task
13 delegated_by: list[str] = field(default_factory=list) # Delegation chain
14 status: str = "pending"
15 result: Optional[dict] = None
16
17
18class DelegationStrategy(ABC):
19 """Strategy for delegating tasks to children."""
20
21 @abstractmethod
22 def select_delegate(
23 self,
24 task: DelegatedTask,
25 candidates: list[HierarchyNode]
26 ) -> Optional[HierarchyNode]:
27 """Select which child should receive the task."""
28 pass
29
30
31class CapabilityMatching(DelegationStrategy):
32 """Delegate to child with matching capabilities."""
33
34 def select_delegate(
35 self,
36 task: DelegatedTask,
37 candidates: list[HierarchyNode]
38 ) -> Optional[HierarchyNode]:
39 task_type = task.type
40 for candidate in candidates:
41 if task_type in candidate.capabilities:
42 return candidate
43 return None
44
45
46class LoadBalancing(DelegationStrategy):
47 """Distribute tasks evenly among children."""
48
49 task_counts: dict[str, int] = field(default_factory=dict)
50
51 def select_delegate(
52 self,
53 task: DelegatedTask,
54 candidates: list[HierarchyNode]
55 ) -> Optional[HierarchyNode]:
56 if not candidates:
57 return None
58
59 # Initialize counts
60 for c in candidates:
61 if c.agent_id not in self.task_counts:
62 self.task_counts[c.agent_id] = 0
63
64 # Select least loaded
65 selected = min(
66 candidates,
67 key=lambda c: self.task_counts[c.agent_id]
68 )
69 self.task_counts[selected.agent_id] += 1
70 return selected
71
72
73class SkillBasedDelegation(DelegationStrategy):
74 """Delegate based on skill matching and expertise level."""
75
76 skill_scores: dict[str, dict[str, float]] = field(default_factory=dict)
77
78 def select_delegate(
79 self,
80 task: DelegatedTask,
81 candidates: list[HierarchyNode]
82 ) -> Optional[HierarchyNode]:
83 required_skills = task.payload.get("required_skills", [])
84
85 best_match = None
86 best_score = -1
87
88 for candidate in candidates:
89 score = 0
90 for skill in required_skills:
91 agent_scores = self.skill_scores.get(candidate.agent_id, {})
92 score += agent_scores.get(skill, 0)
93
94 if score > best_score:
95 best_score = score
96 best_match = candidate
97
98 return best_match
99
100
101@dataclass
102class HierarchicalDelegator:
103 """Manages delegation through the hierarchy."""
104
105 root: HierarchyNode
106 strategy: DelegationStrategy = field(default_factory=CapabilityMatching)
107 active_tasks: dict[str, DelegatedTask] = field(default_factory=dict)
108
109 async def delegate(self, task: DelegatedTask, from_node: HierarchyNode) -> Optional[HierarchyNode]:
110 """Delegate task from a node to appropriate child."""
111 if not from_node.children:
112 # Leaf node - must execute directly
113 return None
114
115 # Record delegation chain
116 task.delegated_by.append(from_node.agent_id)
117
118 # Select delegate using strategy
119 delegate = self.strategy.select_delegate(task, from_node.children)
120
121 if delegate:
122 self.active_tasks[task.id] = task
123 return delegate
124
125 return None
126
127 async def delegate_recursively(
128 self,
129 task: DelegatedTask,
130 from_node: HierarchyNode
131 ) -> Optional[HierarchyNode]:
132 """Delegate task down to appropriate leaf node."""
133 current = from_node
134
135 while current.children:
136 delegate = await self.delegate(task, current)
137 if delegate:
138 current = delegate
139 else:
140 break
141
142 return current # Return the node that will execute
143
144
145@dataclass
146class TaskDecomposer:
147 """Decomposes complex tasks for delegation to multiple children."""
148
149 async def decompose(
150 self,
151 task: DelegatedTask,
152 children: list[HierarchyNode]
153 ) -> list[tuple[HierarchyNode, DelegatedTask]]:
154 """Break task into subtasks for different children."""
155
156 # Example: split a full-stack task into frontend and backend
157 if task.type == "full_stack_feature":
158 assignments = []
159
160 # Find backend child
161 backend_child = next(
162 (c for c in children if "backend" in c.capabilities),
163 None
164 )
165 if backend_child:
166 backend_task = DelegatedTask(
167 id=f"[task.id]-backend",
168 type="backend",
169 payload={"feature": task.payload.get("feature"), "part": "backend"},
170 origin=task.origin
171 )
172 assignments.append((backend_child, backend_task))
173
174 # Find frontend child
175 frontend_child = next(
176 (c for c in children if "frontend" in c.capabilities),
177 None
178 )
179 if frontend_child:
180 frontend_task = DelegatedTask(
181 id=f"[task.id]-frontend",
182 type="frontend",
183 payload={"feature": task.payload.get("feature"), "part": "frontend"},
184 origin=task.origin
185 )
186 assignments.append((frontend_child, frontend_task))
187
188 return assignments
189
190 # No decomposition needed
191 return []Escalation Mechanisms
When lower-level agents cannot handle issues, escalation moves them up the hierarchy:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4
5
6class EscalationReason(Enum):
7 """Reasons for escalating a task."""
8 BEYOND_CAPABILITY = "beyond_capability"
9 RESOURCE_CONFLICT = "resource_conflict"
10 POLICY_DECISION = "policy_decision"
11 CROSS_TEAM = "cross_team"
12 EMERGENCY = "emergency"
13 APPROVAL_REQUIRED = "approval_required"
14
15
16@dataclass
17class Escalation:
18 """An escalated issue."""
19 id: str
20 original_task: DelegatedTask
21 escalated_from: str
22 escalated_to: str
23 reason: EscalationReason
24 context: dict
25 status: str = "pending"
26 resolution: Optional[dict] = None
27
28
29class EscalationPolicy(ABC):
30 """Policy for handling escalations."""
31
32 @abstractmethod
33 def should_escalate(
34 self,
35 task: DelegatedTask,
36 node: HierarchyNode,
37 context: dict
38 ) -> tuple[bool, Optional[EscalationReason]]:
39 """Determine if task should be escalated."""
40 pass
41
42 @abstractmethod
43 def get_escalation_target(
44 self,
45 node: HierarchyNode,
46 reason: EscalationReason
47 ) -> Optional[HierarchyNode]:
48 """Determine who to escalate to."""
49 pass
50
51
52@dataclass
53class DefaultEscalationPolicy(EscalationPolicy):
54 """Default escalation policy."""
55
56 def should_escalate(
57 self,
58 task: DelegatedTask,
59 node: HierarchyNode,
60 context: dict
61 ) -> tuple[bool, Optional[EscalationReason]]:
62 """Check various escalation conditions."""
63
64 # Beyond capability
65 if task.type not in node.capabilities and not node.children:
66 return True, EscalationReason.BEYOND_CAPABILITY
67
68 # Cross-team coordination needed
69 if context.get("requires_cross_team"):
70 return True, EscalationReason.CROSS_TEAM
71
72 # Policy decision needed
73 if context.get("requires_policy_decision"):
74 return True, EscalationReason.POLICY_DECISION
75
76 # Emergency
77 if context.get("is_emergency"):
78 return True, EscalationReason.EMERGENCY
79
80 return False, None
81
82 def get_escalation_target(
83 self,
84 node: HierarchyNode,
85 reason: EscalationReason
86 ) -> Optional[HierarchyNode]:
87 """Return parent for most cases."""
88 if reason == EscalationReason.EMERGENCY:
89 # Go straight to executive level
90 path = node.get_path_to_root()
91 for n in path:
92 if n.role == AgentRole.EXECUTIVE:
93 return n
94 return path[-1] # Root
95
96 return node.parent
97
98
99@dataclass
100class EscalationManager:
101 """Manages escalations through the hierarchy."""
102
103 root: HierarchyNode
104 policy: EscalationPolicy = field(default_factory=DefaultEscalationPolicy)
105 active_escalations: dict[str, Escalation] = field(default_factory=dict)
106 resolved_escalations: list[Escalation] = field(default_factory=list)
107
108 def escalate(
109 self,
110 task: DelegatedTask,
111 from_node: HierarchyNode,
112 reason: EscalationReason,
113 context: dict = None
114 ) -> Optional[Escalation]:
115 """Create an escalation."""
116 target = self.policy.get_escalation_target(from_node, reason)
117
118 if not target:
119 return None
120
121 escalation = Escalation(
122 id=str(uuid.uuid4()),
123 original_task=task,
124 escalated_from=from_node.agent_id,
125 escalated_to=target.agent_id,
126 reason=reason,
127 context=context or {}
128 )
129
130 self.active_escalations[escalation.id] = escalation
131 return escalation
132
133 def resolve(
134 self,
135 escalation_id: str,
136 resolution: dict,
137 resolved_by: str
138 ):
139 """Resolve an escalation."""
140 if escalation_id not in self.active_escalations:
141 return
142
143 escalation = self.active_escalations[escalation_id]
144 escalation.status = "resolved"
145 escalation.resolution = {
146 "resolved_by": resolved_by,
147 "action": resolution
148 }
149
150 del self.active_escalations[escalation_id]
151 self.resolved_escalations.append(escalation)
152
153
154@dataclass
155class ApprovalWorkflow:
156 """Manages approval requests in the hierarchy."""
157
158 approvers: dict[str, list[str]] = field(default_factory=dict) # level -> approvers
159 pending_approvals: dict[str, dict] = field(default_factory=dict)
160
161 def request_approval(
162 self,
163 task: DelegatedTask,
164 requester: HierarchyNode,
165 approval_type: str
166 ) -> str:
167 """Request approval from appropriate level."""
168 approval_id = str(uuid.uuid4())
169
170 # Find approver based on task type and requester level
171 target_level = max(0, requester.level - 1) # Go up one level
172
173 self.pending_approvals[approval_id] = {
174 "task": task,
175 "requester": requester.agent_id,
176 "approval_type": approval_type,
177 "target_level": target_level,
178 "status": "pending",
179 "approvals": [],
180 "rejections": []
181 }
182
183 return approval_id
184
185 def approve(self, approval_id: str, approver: str, comments: str = ""):
186 """Approve a request."""
187 if approval_id in self.pending_approvals:
188 self.pending_approvals[approval_id]["approvals"].append({
189 "approver": approver,
190 "comments": comments
191 })
192 self._check_completion(approval_id)
193
194 def reject(self, approval_id: str, approver: str, reason: str):
195 """Reject a request."""
196 if approval_id in self.pending_approvals:
197 self.pending_approvals[approval_id]["rejections"].append({
198 "approver": approver,
199 "reason": reason
200 })
201 self.pending_approvals[approval_id]["status"] = "rejected"
202
203 def _check_completion(self, approval_id: str):
204 """Check if approval is complete."""
205 approval = self.pending_approvals[approval_id]
206 # Simple: one approval is enough
207 if approval["approvals"]:
208 approval["status"] = "approved"Complete Hierarchy Implementation
Here's a complete hierarchical orchestration system:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable, Awaitable
3import asyncio
4import uuid
5
6
7@dataclass
8class HierarchicalAgent:
9 """Agent that operates within a hierarchy."""
10
11 node: HierarchyNode
12 delegator: HierarchicalDelegator
13 escalation_manager: EscalationManager
14 execute_fn: Callable[[DelegatedTask], Awaitable[dict]]
15
16 async def handle_task(self, task: DelegatedTask) -> dict:
17 """Handle a task - execute, delegate, or escalate."""
18
19 # Check if we should escalate
20 should_escalate, reason = self.escalation_manager.policy.should_escalate(
21 task, self.node, {}
22 )
23
24 if should_escalate and reason:
25 escalation = self.escalation_manager.escalate(
26 task, self.node, reason
27 )
28 if escalation:
29 return {"escalated": True, "escalation_id": escalation.id}
30
31 # Try to delegate
32 if self.node.children:
33 delegate = await self.delegator.delegate(task, self.node)
34 if delegate:
35 return {"delegated": True, "delegate": delegate.agent_id}
36
37 # Execute ourselves
38 result = await self.execute_fn(task)
39 task.result = result
40 task.status = "completed"
41 return result
42
43
44@dataclass
45class HierarchicalOrchestrator:
46 """Complete hierarchical orchestration system."""
47
48 root: HierarchyNode
49 agents: dict[str, HierarchicalAgent] = field(default_factory=dict)
50 delegator: HierarchicalDelegator = None
51 escalation_manager: EscalationManager = None
52
53 def __post_init__(self):
54 self.delegator = HierarchicalDelegator(root=self.root)
55 self.escalation_manager = EscalationManager(root=self.root)
56
57 def register_agent(
58 self,
59 node: HierarchyNode,
60 execute_fn: Callable[[DelegatedTask], Awaitable[dict]]
61 ):
62 """Register an agent for a hierarchy node."""
63 agent = HierarchicalAgent(
64 node=node,
65 delegator=self.delegator,
66 escalation_manager=self.escalation_manager,
67 execute_fn=execute_fn
68 )
69 self.agents[node.agent_id] = agent
70
71 async def submit_task(self, task: DelegatedTask) -> dict:
72 """Submit task to the hierarchy."""
73 # Start at root
74 current_node = self.root
75
76 # Delegate down to appropriate level
77 final_node = await self.delegator.delegate_recursively(task, current_node)
78
79 if final_node.agent_id in self.agents:
80 agent = self.agents[final_node.agent_id]
81 return await agent.handle_task(task)
82
83 return {"error": "No agent registered for node"}
84
85 async def handle_escalation(self, escalation_id: str) -> dict:
86 """Handle an escalation at the appropriate level."""
87 if escalation_id not in self.escalation_manager.active_escalations:
88 return {"error": "Escalation not found"}
89
90 escalation = self.escalation_manager.active_escalations[escalation_id]
91 target_agent_id = escalation.escalated_to
92
93 if target_agent_id in self.agents:
94 agent = self.agents[target_agent_id]
95 result = await agent.handle_task(escalation.original_task)
96
97 self.escalation_manager.resolve(
98 escalation_id,
99 result,
100 target_agent_id
101 )
102
103 return result
104
105 return {"error": "Escalation target not found"}
106
107 def get_status(self) -> dict:
108 """Get orchestrator status."""
109 return {
110 "hierarchy_depth": HierarchyMetrics(self.root).depth(),
111 "total_agents": len(self.agents),
112 "active_tasks": len(self.delegator.active_tasks),
113 "active_escalations": len(self.escalation_manager.active_escalations),
114 "resolved_escalations": len(self.escalation_manager.resolved_escalations)
115 }
116
117
118# Example usage
119async def hierarchy_example():
120 # Build hierarchy
121 builder = HierarchyBuilder()
122 root = builder.build_development_team()
123
124 # Create orchestrator
125 orchestrator = HierarchicalOrchestrator(root=root)
126
127 # Register execution functions for each agent
128 async def cto_execute(task: DelegatedTask) -> dict:
129 return {"decision": "Strategic direction set"}
130
131 async def lead_execute(task: DelegatedTask) -> dict:
132 return {"coordination": "Work distributed to team"}
133
134 async def specialist_execute(task: DelegatedTask) -> dict:
135 return {"expertise": "Technical solution designed"}
136
137 async def worker_execute(task: DelegatedTask) -> dict:
138 return {"code": "# Implementation complete"}
139
140 # Register agents
141 orchestrator.register_agent(root, cto_execute) # CTO
142
143 for child in root.children: # Leads
144 orchestrator.register_agent(child, lead_execute)
145 for specialist in child.children: # Specialists
146 orchestrator.register_agent(specialist, specialist_execute)
147 for worker in specialist.children: # Workers
148 orchestrator.register_agent(worker, worker_execute)
149
150 # Submit a task
151 task = DelegatedTask(
152 id=str(uuid.uuid4()),
153 type="api_design",
154 payload={"feature": "user authentication"},
155 origin="external"
156 )
157
158 result = await orchestrator.submit_task(task)
159 print(f"Task result: [result]")
160 print(f"Status: [orchestrator.get_status()]")Key Takeaways
- Hierarchy design should balance depth and breadth based on task complexity and coordination needs.
- Delegation strategies determine how tasks flow down - capability matching, load balancing, or skill-based.
- Task decomposition allows complex tasks to be split among multiple children for parallel execution.
- Escalation mechanisms handle cases where lower levels cannot complete tasks, moving issues up the chain.
- Approval workflows formalize decision-making for tasks requiring higher-level authorization.
Next Section Preview: We'll explore shared state and communication patterns that enable coordination across all multi-agent architectures.