Chapter 14
18 min read
Section 89 of 175

Hierarchical Orchestration

Multi-Agent Systems

Introduction

Hierarchical orchestration organizes agents in layers, combining the benefits of centralized coordination with the scalability of distributed execution. This pattern mirrors organizational structures found in large enterprises, where work flows down through management layers and issues escalate up.

Section Overview: We'll design hierarchical agent structures, implement delegation chains, and build escalation mechanisms for handling issues that require higher-level intervention.

Hierarchy Design

Effective hierarchies balance depth (layers) with breadth (agents per layer) based on task complexity and coordination needs:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable, Awaitable
3from abc import ABC, abstractmethod
4from enum import Enum
5import asyncio
6
7
8class AgentRole(Enum):
9    """Roles in the hierarchy."""
10    EXECUTIVE = "executive"      # Top-level strategic decisions
11    MANAGER = "manager"          # Mid-level coordination
12    SPECIALIST = "specialist"    # Domain experts
13    WORKER = "worker"            # Task execution
14
15
16@dataclass
17class HierarchyNode:
18    """Node in the agent hierarchy."""
19    agent_id: str
20    role: AgentRole
21    level: int  # 0 = top
22    parent: Optional['HierarchyNode'] = None
23    children: list['HierarchyNode'] = field(default_factory=list)
24    capabilities: list[str] = field(default_factory=list)
25
26    def add_child(self, child: 'HierarchyNode'):
27        """Add a child node."""
28        child.parent = self
29        child.level = self.level + 1
30        self.children.append(child)
31
32    def get_path_to_root(self) -> list['HierarchyNode']:
33        """Get chain from this node to root."""
34        path = [self]
35        current = self
36        while current.parent:
37            path.append(current.parent)
38            current = current.parent
39        return path
40
41    def find_by_id(self, agent_id: str) -> Optional['HierarchyNode']:
42        """Find node by agent ID in subtree."""
43        if self.agent_id == agent_id:
44            return self
45        for child in self.children:
46            found = child.find_by_id(agent_id)
47            if found:
48                return found
49        return None
50
51
52@dataclass
53class HierarchyBuilder:
54    """Builder for constructing agent hierarchies."""
55
56    def build_development_team(self) -> HierarchyNode:
57        """Build a software development team hierarchy."""
58
59        # Executive level
60        cto = HierarchyNode(
61            agent_id="cto",
62            role=AgentRole.EXECUTIVE,
63            level=0,
64            capabilities=["strategy", "architecture", "resource_allocation"]
65        )
66
67        # Manager level
68        backend_lead = HierarchyNode(
69            agent_id="backend_lead",
70            role=AgentRole.MANAGER,
71            level=1,
72            capabilities=["backend", "api", "database", "coordination"]
73        )
74
75        frontend_lead = HierarchyNode(
76            agent_id="frontend_lead",
77            role=AgentRole.MANAGER,
78            level=1,
79            capabilities=["frontend", "ui", "ux", "coordination"]
80        )
81
82        # Specialist level
83        api_specialist = HierarchyNode(
84            agent_id="api_specialist",
85            role=AgentRole.SPECIALIST,
86            level=2,
87            capabilities=["api_design", "rest", "graphql"]
88        )
89
90        db_specialist = HierarchyNode(
91            agent_id="db_specialist",
92            role=AgentRole.SPECIALIST,
93            level=2,
94            capabilities=["database", "sql", "optimization"]
95        )
96
97        react_specialist = HierarchyNode(
98            agent_id="react_specialist",
99            role=AgentRole.SPECIALIST,
100            level=2,
101            capabilities=["react", "typescript", "components"]
102        )
103
104        # Worker level
105        backend_dev_1 = HierarchyNode(
106            agent_id="backend_dev_1",
107            role=AgentRole.WORKER,
108            level=3,
109            capabilities=["python", "coding"]
110        )
111
112        frontend_dev_1 = HierarchyNode(
113            agent_id="frontend_dev_1",
114            role=AgentRole.WORKER,
115            level=3,
116            capabilities=["javascript", "css", "coding"]
117        )
118
119        # Assemble hierarchy
120        cto.add_child(backend_lead)
121        cto.add_child(frontend_lead)
122
123        backend_lead.add_child(api_specialist)
124        backend_lead.add_child(db_specialist)
125
126        frontend_lead.add_child(react_specialist)
127
128        api_specialist.add_child(backend_dev_1)
129        react_specialist.add_child(frontend_dev_1)
130
131        return cto
132
133
134@dataclass
135class HierarchyMetrics:
136    """Metrics about the hierarchy structure."""
137
138    root: HierarchyNode
139
140    def depth(self) -> int:
141        """Maximum depth of hierarchy."""
142        def max_depth(node: HierarchyNode) -> int:
143            if not node.children:
144                return node.level
145            return max(max_depth(child) for child in node.children)
146        return max_depth(self.root) + 1
147
148    def breadth_at_level(self, level: int) -> int:
149        """Number of nodes at specific level."""
150        def count_at_level(node: HierarchyNode, target: int) -> int:
151            if node.level == target:
152                return 1
153            return sum(count_at_level(child, target) for child in node.children)
154        return count_at_level(self.root, level)
155
156    def total_nodes(self) -> int:
157        """Total nodes in hierarchy."""
158        def count(node: HierarchyNode) -> int:
159            return 1 + sum(count(child) for child in node.children)
160        return count(self.root)
161
162    def span_of_control(self) -> dict:
163        """Average direct reports per manager."""
164        managers = []
165        def collect_managers(node: HierarchyNode):
166            if node.children:
167                managers.append(len(node.children))
168            for child in node.children:
169                collect_managers(child)
170        collect_managers(self.root)
171
172        if not managers:
173            return {"avg": 0, "max": 0, "min": 0}
174        return {
175            "avg": sum(managers) / len(managers),
176            "max": max(managers),
177            "min": min(managers)
178        }
RoleResponsibilitiesTypical Span
ExecutiveStrategy, resource allocation2-5 managers
ManagerCoordination, prioritization3-7 specialists
SpecialistDomain expertise, quality2-4 workers
WorkerTask executionN/A (leaf nodes)

Delegation Patterns

Delegation determines how tasks flow down through the hierarchy:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from abc import ABC, abstractmethod
4
5
6@dataclass
7class DelegatedTask:
8    """Task being delegated through hierarchy."""
9    id: str
10    type: str
11    payload: dict
12    origin: str  # Agent that created task
13    delegated_by: list[str] = field(default_factory=list)  # Delegation chain
14    status: str = "pending"
15    result: Optional[dict] = None
16
17
18class DelegationStrategy(ABC):
19    """Strategy for delegating tasks to children."""
20
21    @abstractmethod
22    def select_delegate(
23        self,
24        task: DelegatedTask,
25        candidates: list[HierarchyNode]
26    ) -> Optional[HierarchyNode]:
27        """Select which child should receive the task."""
28        pass
29
30
31class CapabilityMatching(DelegationStrategy):
32    """Delegate to child with matching capabilities."""
33
34    def select_delegate(
35        self,
36        task: DelegatedTask,
37        candidates: list[HierarchyNode]
38    ) -> Optional[HierarchyNode]:
39        task_type = task.type
40        for candidate in candidates:
41            if task_type in candidate.capabilities:
42                return candidate
43        return None
44
45
46class LoadBalancing(DelegationStrategy):
47    """Distribute tasks evenly among children."""
48
49    task_counts: dict[str, int] = field(default_factory=dict)
50
51    def select_delegate(
52        self,
53        task: DelegatedTask,
54        candidates: list[HierarchyNode]
55    ) -> Optional[HierarchyNode]:
56        if not candidates:
57            return None
58
59        # Initialize counts
60        for c in candidates:
61            if c.agent_id not in self.task_counts:
62                self.task_counts[c.agent_id] = 0
63
64        # Select least loaded
65        selected = min(
66            candidates,
67            key=lambda c: self.task_counts[c.agent_id]
68        )
69        self.task_counts[selected.agent_id] += 1
70        return selected
71
72
73class SkillBasedDelegation(DelegationStrategy):
74    """Delegate based on skill matching and expertise level."""
75
76    skill_scores: dict[str, dict[str, float]] = field(default_factory=dict)
77
78    def select_delegate(
79        self,
80        task: DelegatedTask,
81        candidates: list[HierarchyNode]
82    ) -> Optional[HierarchyNode]:
83        required_skills = task.payload.get("required_skills", [])
84
85        best_match = None
86        best_score = -1
87
88        for candidate in candidates:
89            score = 0
90            for skill in required_skills:
91                agent_scores = self.skill_scores.get(candidate.agent_id, {})
92                score += agent_scores.get(skill, 0)
93
94            if score > best_score:
95                best_score = score
96                best_match = candidate
97
98        return best_match
99
100
101@dataclass
102class HierarchicalDelegator:
103    """Manages delegation through the hierarchy."""
104
105    root: HierarchyNode
106    strategy: DelegationStrategy = field(default_factory=CapabilityMatching)
107    active_tasks: dict[str, DelegatedTask] = field(default_factory=dict)
108
109    async def delegate(self, task: DelegatedTask, from_node: HierarchyNode) -> Optional[HierarchyNode]:
110        """Delegate task from a node to appropriate child."""
111        if not from_node.children:
112            # Leaf node - must execute directly
113            return None
114
115        # Record delegation chain
116        task.delegated_by.append(from_node.agent_id)
117
118        # Select delegate using strategy
119        delegate = self.strategy.select_delegate(task, from_node.children)
120
121        if delegate:
122            self.active_tasks[task.id] = task
123            return delegate
124
125        return None
126
127    async def delegate_recursively(
128        self,
129        task: DelegatedTask,
130        from_node: HierarchyNode
131    ) -> Optional[HierarchyNode]:
132        """Delegate task down to appropriate leaf node."""
133        current = from_node
134
135        while current.children:
136            delegate = await self.delegate(task, current)
137            if delegate:
138                current = delegate
139            else:
140                break
141
142        return current  # Return the node that will execute
143
144
145@dataclass
146class TaskDecomposer:
147    """Decomposes complex tasks for delegation to multiple children."""
148
149    async def decompose(
150        self,
151        task: DelegatedTask,
152        children: list[HierarchyNode]
153    ) -> list[tuple[HierarchyNode, DelegatedTask]]:
154        """Break task into subtasks for different children."""
155
156        # Example: split a full-stack task into frontend and backend
157        if task.type == "full_stack_feature":
158            assignments = []
159
160            # Find backend child
161            backend_child = next(
162                (c for c in children if "backend" in c.capabilities),
163                None
164            )
165            if backend_child:
166                backend_task = DelegatedTask(
167                    id=f"[task.id]-backend",
168                    type="backend",
169                    payload={"feature": task.payload.get("feature"), "part": "backend"},
170                    origin=task.origin
171                )
172                assignments.append((backend_child, backend_task))
173
174            # Find frontend child
175            frontend_child = next(
176                (c for c in children if "frontend" in c.capabilities),
177                None
178            )
179            if frontend_child:
180                frontend_task = DelegatedTask(
181                    id=f"[task.id]-frontend",
182                    type="frontend",
183                    payload={"feature": task.payload.get("feature"), "part": "frontend"},
184                    origin=task.origin
185                )
186                assignments.append((frontend_child, frontend_task))
187
188            return assignments
189
190        # No decomposition needed
191        return []

Escalation Mechanisms

When lower-level agents cannot handle issues, escalation moves them up the hierarchy:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4
5
6class EscalationReason(Enum):
7    """Reasons for escalating a task."""
8    BEYOND_CAPABILITY = "beyond_capability"
9    RESOURCE_CONFLICT = "resource_conflict"
10    POLICY_DECISION = "policy_decision"
11    CROSS_TEAM = "cross_team"
12    EMERGENCY = "emergency"
13    APPROVAL_REQUIRED = "approval_required"
14
15
16@dataclass
17class Escalation:
18    """An escalated issue."""
19    id: str
20    original_task: DelegatedTask
21    escalated_from: str
22    escalated_to: str
23    reason: EscalationReason
24    context: dict
25    status: str = "pending"
26    resolution: Optional[dict] = None
27
28
29class EscalationPolicy(ABC):
30    """Policy for handling escalations."""
31
32    @abstractmethod
33    def should_escalate(
34        self,
35        task: DelegatedTask,
36        node: HierarchyNode,
37        context: dict
38    ) -> tuple[bool, Optional[EscalationReason]]:
39        """Determine if task should be escalated."""
40        pass
41
42    @abstractmethod
43    def get_escalation_target(
44        self,
45        node: HierarchyNode,
46        reason: EscalationReason
47    ) -> Optional[HierarchyNode]:
48        """Determine who to escalate to."""
49        pass
50
51
52@dataclass
53class DefaultEscalationPolicy(EscalationPolicy):
54    """Default escalation policy."""
55
56    def should_escalate(
57        self,
58        task: DelegatedTask,
59        node: HierarchyNode,
60        context: dict
61    ) -> tuple[bool, Optional[EscalationReason]]:
62        """Check various escalation conditions."""
63
64        # Beyond capability
65        if task.type not in node.capabilities and not node.children:
66            return True, EscalationReason.BEYOND_CAPABILITY
67
68        # Cross-team coordination needed
69        if context.get("requires_cross_team"):
70            return True, EscalationReason.CROSS_TEAM
71
72        # Policy decision needed
73        if context.get("requires_policy_decision"):
74            return True, EscalationReason.POLICY_DECISION
75
76        # Emergency
77        if context.get("is_emergency"):
78            return True, EscalationReason.EMERGENCY
79
80        return False, None
81
82    def get_escalation_target(
83        self,
84        node: HierarchyNode,
85        reason: EscalationReason
86    ) -> Optional[HierarchyNode]:
87        """Return parent for most cases."""
88        if reason == EscalationReason.EMERGENCY:
89            # Go straight to executive level
90            path = node.get_path_to_root()
91            for n in path:
92                if n.role == AgentRole.EXECUTIVE:
93                    return n
94            return path[-1]  # Root
95
96        return node.parent
97
98
99@dataclass
100class EscalationManager:
101    """Manages escalations through the hierarchy."""
102
103    root: HierarchyNode
104    policy: EscalationPolicy = field(default_factory=DefaultEscalationPolicy)
105    active_escalations: dict[str, Escalation] = field(default_factory=dict)
106    resolved_escalations: list[Escalation] = field(default_factory=list)
107
108    def escalate(
109        self,
110        task: DelegatedTask,
111        from_node: HierarchyNode,
112        reason: EscalationReason,
113        context: dict = None
114    ) -> Optional[Escalation]:
115        """Create an escalation."""
116        target = self.policy.get_escalation_target(from_node, reason)
117
118        if not target:
119            return None
120
121        escalation = Escalation(
122            id=str(uuid.uuid4()),
123            original_task=task,
124            escalated_from=from_node.agent_id,
125            escalated_to=target.agent_id,
126            reason=reason,
127            context=context or {}
128        )
129
130        self.active_escalations[escalation.id] = escalation
131        return escalation
132
133    def resolve(
134        self,
135        escalation_id: str,
136        resolution: dict,
137        resolved_by: str
138    ):
139        """Resolve an escalation."""
140        if escalation_id not in self.active_escalations:
141            return
142
143        escalation = self.active_escalations[escalation_id]
144        escalation.status = "resolved"
145        escalation.resolution = {
146            "resolved_by": resolved_by,
147            "action": resolution
148        }
149
150        del self.active_escalations[escalation_id]
151        self.resolved_escalations.append(escalation)
152
153
154@dataclass
155class ApprovalWorkflow:
156    """Manages approval requests in the hierarchy."""
157
158    approvers: dict[str, list[str]] = field(default_factory=dict)  # level -> approvers
159    pending_approvals: dict[str, dict] = field(default_factory=dict)
160
161    def request_approval(
162        self,
163        task: DelegatedTask,
164        requester: HierarchyNode,
165        approval_type: str
166    ) -> str:
167        """Request approval from appropriate level."""
168        approval_id = str(uuid.uuid4())
169
170        # Find approver based on task type and requester level
171        target_level = max(0, requester.level - 1)  # Go up one level
172
173        self.pending_approvals[approval_id] = {
174            "task": task,
175            "requester": requester.agent_id,
176            "approval_type": approval_type,
177            "target_level": target_level,
178            "status": "pending",
179            "approvals": [],
180            "rejections": []
181        }
182
183        return approval_id
184
185    def approve(self, approval_id: str, approver: str, comments: str = ""):
186        """Approve a request."""
187        if approval_id in self.pending_approvals:
188            self.pending_approvals[approval_id]["approvals"].append({
189                "approver": approver,
190                "comments": comments
191            })
192            self._check_completion(approval_id)
193
194    def reject(self, approval_id: str, approver: str, reason: str):
195        """Reject a request."""
196        if approval_id in self.pending_approvals:
197            self.pending_approvals[approval_id]["rejections"].append({
198                "approver": approver,
199                "reason": reason
200            })
201            self.pending_approvals[approval_id]["status"] = "rejected"
202
203    def _check_completion(self, approval_id: str):
204        """Check if approval is complete."""
205        approval = self.pending_approvals[approval_id]
206        # Simple: one approval is enough
207        if approval["approvals"]:
208            approval["status"] = "approved"

Complete Hierarchy Implementation

Here's a complete hierarchical orchestration system:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable, Awaitable
3import asyncio
4import uuid
5
6
7@dataclass
8class HierarchicalAgent:
9    """Agent that operates within a hierarchy."""
10
11    node: HierarchyNode
12    delegator: HierarchicalDelegator
13    escalation_manager: EscalationManager
14    execute_fn: Callable[[DelegatedTask], Awaitable[dict]]
15
16    async def handle_task(self, task: DelegatedTask) -> dict:
17        """Handle a task - execute, delegate, or escalate."""
18
19        # Check if we should escalate
20        should_escalate, reason = self.escalation_manager.policy.should_escalate(
21            task, self.node, {}
22        )
23
24        if should_escalate and reason:
25            escalation = self.escalation_manager.escalate(
26                task, self.node, reason
27            )
28            if escalation:
29                return {"escalated": True, "escalation_id": escalation.id}
30
31        # Try to delegate
32        if self.node.children:
33            delegate = await self.delegator.delegate(task, self.node)
34            if delegate:
35                return {"delegated": True, "delegate": delegate.agent_id}
36
37        # Execute ourselves
38        result = await self.execute_fn(task)
39        task.result = result
40        task.status = "completed"
41        return result
42
43
44@dataclass
45class HierarchicalOrchestrator:
46    """Complete hierarchical orchestration system."""
47
48    root: HierarchyNode
49    agents: dict[str, HierarchicalAgent] = field(default_factory=dict)
50    delegator: HierarchicalDelegator = None
51    escalation_manager: EscalationManager = None
52
53    def __post_init__(self):
54        self.delegator = HierarchicalDelegator(root=self.root)
55        self.escalation_manager = EscalationManager(root=self.root)
56
57    def register_agent(
58        self,
59        node: HierarchyNode,
60        execute_fn: Callable[[DelegatedTask], Awaitable[dict]]
61    ):
62        """Register an agent for a hierarchy node."""
63        agent = HierarchicalAgent(
64            node=node,
65            delegator=self.delegator,
66            escalation_manager=self.escalation_manager,
67            execute_fn=execute_fn
68        )
69        self.agents[node.agent_id] = agent
70
71    async def submit_task(self, task: DelegatedTask) -> dict:
72        """Submit task to the hierarchy."""
73        # Start at root
74        current_node = self.root
75
76        # Delegate down to appropriate level
77        final_node = await self.delegator.delegate_recursively(task, current_node)
78
79        if final_node.agent_id in self.agents:
80            agent = self.agents[final_node.agent_id]
81            return await agent.handle_task(task)
82
83        return {"error": "No agent registered for node"}
84
85    async def handle_escalation(self, escalation_id: str) -> dict:
86        """Handle an escalation at the appropriate level."""
87        if escalation_id not in self.escalation_manager.active_escalations:
88            return {"error": "Escalation not found"}
89
90        escalation = self.escalation_manager.active_escalations[escalation_id]
91        target_agent_id = escalation.escalated_to
92
93        if target_agent_id in self.agents:
94            agent = self.agents[target_agent_id]
95            result = await agent.handle_task(escalation.original_task)
96
97            self.escalation_manager.resolve(
98                escalation_id,
99                result,
100                target_agent_id
101            )
102
103            return result
104
105        return {"error": "Escalation target not found"}
106
107    def get_status(self) -> dict:
108        """Get orchestrator status."""
109        return {
110            "hierarchy_depth": HierarchyMetrics(self.root).depth(),
111            "total_agents": len(self.agents),
112            "active_tasks": len(self.delegator.active_tasks),
113            "active_escalations": len(self.escalation_manager.active_escalations),
114            "resolved_escalations": len(self.escalation_manager.resolved_escalations)
115        }
116
117
118# Example usage
119async def hierarchy_example():
120    # Build hierarchy
121    builder = HierarchyBuilder()
122    root = builder.build_development_team()
123
124    # Create orchestrator
125    orchestrator = HierarchicalOrchestrator(root=root)
126
127    # Register execution functions for each agent
128    async def cto_execute(task: DelegatedTask) -> dict:
129        return {"decision": "Strategic direction set"}
130
131    async def lead_execute(task: DelegatedTask) -> dict:
132        return {"coordination": "Work distributed to team"}
133
134    async def specialist_execute(task: DelegatedTask) -> dict:
135        return {"expertise": "Technical solution designed"}
136
137    async def worker_execute(task: DelegatedTask) -> dict:
138        return {"code": "# Implementation complete"}
139
140    # Register agents
141    orchestrator.register_agent(root, cto_execute)  # CTO
142
143    for child in root.children:  # Leads
144        orchestrator.register_agent(child, lead_execute)
145        for specialist in child.children:  # Specialists
146            orchestrator.register_agent(specialist, specialist_execute)
147            for worker in specialist.children:  # Workers
148                orchestrator.register_agent(worker, worker_execute)
149
150    # Submit a task
151    task = DelegatedTask(
152        id=str(uuid.uuid4()),
153        type="api_design",
154        payload={"feature": "user authentication"},
155        origin="external"
156    )
157
158    result = await orchestrator.submit_task(task)
159    print(f"Task result: [result]")
160    print(f"Status: [orchestrator.get_status()]")

Key Takeaways

  • Hierarchy design should balance depth and breadth based on task complexity and coordination needs.
  • Delegation strategies determine how tasks flow down - capability matching, load balancing, or skill-based.
  • Task decomposition allows complex tasks to be split among multiple children for parallel execution.
  • Escalation mechanisms handle cases where lower levels cannot complete tasks, moving issues up the chain.
  • Approval workflows formalize decision-making for tasks requiring higher-level authorization.
Next Section Preview: We'll explore shared state and communication patterns that enable coordination across all multi-agent architectures.