Introduction
The supervisor pattern is the most common multi-agent architecture, featuring a central agent that coordinates worker agents. This section explores implementation details, worker management, routing strategies, and state handling for building robust supervisor-based systems.
Section Overview: We'll build a complete supervisor system from the ground up, covering worker registration, dynamic routing, and shared state management.
Supervisor Architecture
A well-designed supervisor architecture separates concerns between coordination logic and task execution:
1from dataclasses import dataclass, field
2from typing import Any, Optional, Protocol, Callable, Awaitable
3from abc import abstractmethod
4from enum import Enum
5import asyncio
6
7
8class TaskStatus(Enum):
9 """Status of a task in the system."""
10 PENDING = "pending"
11 ASSIGNED = "assigned"
12 IN_PROGRESS = "in_progress"
13 COMPLETED = "completed"
14 FAILED = "failed"
15
16
17@dataclass
18class Task:
19 """Represents a task to be executed."""
20 id: str
21 type: str
22 payload: dict
23 status: TaskStatus = TaskStatus.PENDING
24 assigned_to: Optional[str] = None
25 result: Optional[dict] = None
26 error: Optional[str] = None
27
28
29class WorkerProtocol(Protocol):
30 """Protocol defining worker interface."""
31
32 @property
33 def name(self) -> str:
34 """Unique worker identifier."""
35 ...
36
37 @property
38 def capabilities(self) -> list[str]:
39 """Task types this worker can handle."""
40 ...
41
42 async def execute(self, task: Task) -> dict:
43 """Execute a task and return result."""
44 ...
45
46 async def health_check(self) -> bool:
47 """Check if worker is healthy."""
48 ...
49
50
51@dataclass
52class SupervisorConfig:
53 """Configuration for supervisor behavior."""
54 max_concurrent_tasks: int = 10
55 task_timeout: float = 300.0 # seconds
56 retry_limit: int = 3
57 enable_health_checks: bool = True
58 health_check_interval: float = 60.0
59
60
61@dataclass
62class Supervisor:
63 """Central coordinator for worker agents."""
64
65 config: SupervisorConfig = field(default_factory=SupervisorConfig)
66 workers: dict[str, WorkerProtocol] = field(default_factory=dict)
67 task_queue: list[Task] = field(default_factory=list)
68 active_tasks: dict[str, Task] = field(default_factory=dict)
69 completed_tasks: list[Task] = field(default_factory=list)
70
71 # Routing strategy
72 router: Optional[Callable[[Task, dict], str]] = None
73
74 def __post_init__(self):
75 if self.router is None:
76 self.router = self.default_router
77
78 def register_worker(self, worker: WorkerProtocol):
79 """Register a worker agent."""
80 self.workers[worker.name] = worker
81 print(f"Registered worker: [worker.name] with capabilities: [worker.capabilities]")
82
83 def unregister_worker(self, worker_name: str):
84 """Remove a worker from the pool."""
85 if worker_name in self.workers:
86 del self.workers[worker_name]
87 print(f"Unregistered worker: [worker_name]")
88
89 def default_router(self, task: Task, workers: dict) -> str:
90 """Default routing: find first capable worker."""
91 for name, worker in workers.items():
92 if task.type in worker.capabilities:
93 return name
94 raise ValueError(f"No worker capable of task type: [task.type]")
95
96 async def submit_task(self, task: Task) -> str:
97 """Submit a task for execution."""
98 self.task_queue.append(task)
99 return task.id
100
101 async def process_queue(self):
102 """Process pending tasks from queue."""
103 while self.task_queue:
104 if len(self.active_tasks) >= self.config.max_concurrent_tasks:
105 await asyncio.sleep(0.1)
106 continue
107
108 task = self.task_queue.pop(0)
109 await self.assign_and_execute(task)
110
111 async def assign_and_execute(self, task: Task):
112 """Assign task to worker and execute."""
113 try:
114 # Route to appropriate worker
115 worker_name = self.router(task, self.workers)
116 task.assigned_to = worker_name
117 task.status = TaskStatus.ASSIGNED
118
119 # Track as active
120 self.active_tasks[task.id] = task
121
122 # Execute with timeout
123 worker = self.workers[worker_name]
124 task.status = TaskStatus.IN_PROGRESS
125
126 result = await asyncio.wait_for(
127 worker.execute(task),
128 timeout=self.config.task_timeout
129 )
130
131 task.result = result
132 task.status = TaskStatus.COMPLETED
133
134 except asyncio.TimeoutError:
135 task.status = TaskStatus.FAILED
136 task.error = "Task timed out"
137
138 except Exception as e:
139 task.status = TaskStatus.FAILED
140 task.error = str(e)
141
142 finally:
143 # Move to completed
144 if task.id in self.active_tasks:
145 del self.active_tasks[task.id]
146 self.completed_tasks.append(task)
147
148 async def get_task_result(self, task_id: str) -> Optional[Task]:
149 """Get result of completed task."""
150 for task in self.completed_tasks:
151 if task.id == task_id:
152 return task
153 return NoneWorker Management
Effective worker management includes registration, capability tracking, health monitoring, and dynamic scaling:
1from dataclasses import dataclass, field
2from typing import Optional
3from enum import Enum
4import asyncio
5import time
6
7
8class WorkerStatus(Enum):
9 """Health status of a worker."""
10 HEALTHY = "healthy"
11 DEGRADED = "degraded"
12 UNHEALTHY = "unhealthy"
13 UNKNOWN = "unknown"
14
15
16@dataclass
17class WorkerMetrics:
18 """Metrics for worker performance."""
19 tasks_completed: int = 0
20 tasks_failed: int = 0
21 total_execution_time: float = 0.0
22 last_health_check: float = 0.0
23 status: WorkerStatus = WorkerStatus.UNKNOWN
24
25 @property
26 def success_rate(self) -> float:
27 total = self.tasks_completed + self.tasks_failed
28 return self.tasks_completed / total if total > 0 else 1.0
29
30 @property
31 def avg_execution_time(self) -> float:
32 if self.tasks_completed == 0:
33 return 0.0
34 return self.total_execution_time / self.tasks_completed
35
36
37@dataclass
38class WorkerPool:
39 """Manages a pool of worker agents."""
40
41 workers: dict[str, WorkerProtocol] = field(default_factory=dict)
42 metrics: dict[str, WorkerMetrics] = field(default_factory=dict)
43 capability_index: dict[str, list[str]] = field(default_factory=dict)
44
45 def register(self, worker: WorkerProtocol):
46 """Register worker and index capabilities."""
47 self.workers[worker.name] = worker
48 self.metrics[worker.name] = WorkerMetrics()
49
50 # Index by capability for fast lookup
51 for cap in worker.capabilities:
52 if cap not in self.capability_index:
53 self.capability_index[cap] = []
54 self.capability_index[cap].append(worker.name)
55
56 def unregister(self, worker_name: str):
57 """Remove worker and update indices."""
58 if worker_name not in self.workers:
59 return
60
61 worker = self.workers[worker_name]
62
63 # Remove from capability index
64 for cap in worker.capabilities:
65 if cap in self.capability_index:
66 self.capability_index[cap].remove(worker_name)
67
68 del self.workers[worker_name]
69 del self.metrics[worker_name]
70
71 def get_capable_workers(self, task_type: str) -> list[str]:
72 """Get all workers capable of handling task type."""
73 return self.capability_index.get(task_type, [])
74
75 def get_best_worker(self, task_type: str) -> Optional[str]:
76 """Get best worker for task based on metrics."""
77 capable = self.get_capable_workers(task_type)
78
79 if not capable:
80 return None
81
82 # Filter to healthy workers only
83 healthy = [
84 w for w in capable
85 if self.metrics[w].status in (WorkerStatus.HEALTHY, WorkerStatus.UNKNOWN)
86 ]
87
88 if not healthy:
89 # Fall back to any capable worker
90 healthy = capable
91
92 # Score workers by performance
93 def score(worker_name: str) -> float:
94 m = self.metrics[worker_name]
95 # Prefer high success rate and low execution time
96 return m.success_rate / (m.avg_execution_time + 0.1)
97
98 return max(healthy, key=score)
99
100 def record_success(self, worker_name: str, execution_time: float):
101 """Record successful task completion."""
102 if worker_name in self.metrics:
103 m = self.metrics[worker_name]
104 m.tasks_completed += 1
105 m.total_execution_time += execution_time
106
107 def record_failure(self, worker_name: str):
108 """Record task failure."""
109 if worker_name in self.metrics:
110 self.metrics[worker_name].tasks_failed += 1
111
112 async def health_check_all(self):
113 """Run health checks on all workers."""
114 for name, worker in self.workers.items():
115 try:
116 is_healthy = await worker.health_check()
117 self.metrics[name].status = (
118 WorkerStatus.HEALTHY if is_healthy else WorkerStatus.UNHEALTHY
119 )
120 except Exception:
121 self.metrics[name].status = WorkerStatus.UNHEALTHY
122
123 self.metrics[name].last_health_check = time.time()
124
125
126@dataclass
127class DynamicWorkerManager:
128 """Manages workers with auto-scaling."""
129
130 pool: WorkerPool = field(default_factory=WorkerPool)
131 worker_factory: Optional[Callable[[str], WorkerProtocol]] = None
132 min_workers_per_capability: int = 1
133 max_workers_per_capability: int = 5
134
135 async def ensure_capacity(self, task_type: str, queue_depth: int):
136 """Scale workers based on demand."""
137 current = len(self.pool.get_capable_workers(task_type))
138
139 # Scale up if queue is deep
140 if queue_depth > current * 2 and current < self.max_workers_per_capability:
141 if self.worker_factory:
142 new_worker = self.worker_factory(task_type)
143 self.pool.register(new_worker)
144 print(f"Scaled up: added worker for [task_type]")
145
146 # Scale down if workers are idle
147 elif queue_depth == 0 and current > self.min_workers_per_capability:
148 # Remove least performing worker
149 workers = self.pool.get_capable_workers(task_type)
150 metrics = [(w, self.pool.metrics[w].success_rate) for w in workers]
151 worst = min(metrics, key=lambda x: x[1])[0]
152 self.pool.unregister(worst)
153 print(f"Scaled down: removed worker [worst]")Task Routing Strategies
Different routing strategies optimize for various objectives like load balancing, specialization, or latency:
1from dataclasses import dataclass
2from typing import Callable, Optional
3from abc import ABC, abstractmethod
4import random
5
6
7class RoutingStrategy(ABC):
8 """Base class for routing strategies."""
9
10 @abstractmethod
11 def select_worker(
12 self,
13 task: Task,
14 pool: WorkerPool
15 ) -> Optional[str]:
16 """Select a worker for the given task."""
17 pass
18
19
20class CapabilityBasedRouter(RoutingStrategy):
21 """Routes based on worker capabilities."""
22
23 def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
24 capable = pool.get_capable_workers(task.type)
25 return capable[0] if capable else None
26
27
28class LoadBalancedRouter(RoutingStrategy):
29 """Distributes tasks evenly across workers."""
30
31 def __init__(self):
32 self.task_counts: dict[str, int] = {}
33
34 def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
35 capable = pool.get_capable_workers(task.type)
36 if not capable:
37 return None
38
39 # Initialize counts
40 for w in capable:
41 if w not in self.task_counts:
42 self.task_counts[w] = 0
43
44 # Select worker with least tasks
45 selected = min(capable, key=lambda w: self.task_counts[w])
46 self.task_counts[selected] += 1
47
48 return selected
49
50
51class PerformanceBasedRouter(RoutingStrategy):
52 """Routes to best performing worker."""
53
54 def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
55 return pool.get_best_worker(task.type)
56
57
58class RandomRouter(RoutingStrategy):
59 """Randomly selects from capable workers."""
60
61 def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
62 capable = pool.get_capable_workers(task.type)
63 return random.choice(capable) if capable else None
64
65
66class PriorityRouter(RoutingStrategy):
67 """Routes based on task priority and worker specialization."""
68
69 def __init__(self, priority_workers: dict[str, list[str]]):
70 # Maps task types to preferred workers
71 self.priority_workers = priority_workers
72
73 def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
74 # Check if there are priority workers for this task type
75 if task.type in self.priority_workers:
76 for preferred in self.priority_workers[task.type]:
77 if preferred in pool.workers:
78 metrics = pool.metrics[preferred]
79 if metrics.status == WorkerStatus.HEALTHY:
80 return preferred
81
82 # Fall back to capability-based routing
83 return pool.get_best_worker(task.type)
84
85
86class ContentBasedRouter(RoutingStrategy):
87 """Routes based on task content analysis."""
88
89 def __init__(self, classifier: Callable[[dict], str]):
90 self.classifier = classifier
91
92 def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
93 # Analyze task content to determine specialized type
94 specialized_type = self.classifier(task.payload)
95
96 # Find worker specialized in this content type
97 for name, worker in pool.workers.items():
98 if specialized_type in worker.capabilities:
99 return name
100
101 # Fall back to general capable worker
102 return pool.get_best_worker(task.type)
103
104
105@dataclass
106class AdaptiveRouter(RoutingStrategy):
107 """Adapts routing strategy based on system conditions."""
108
109 strategies: dict[str, RoutingStrategy] = field(default_factory=dict)
110
111 def __post_init__(self):
112 if not self.strategies:
113 self.strategies = {
114 "low_load": LoadBalancedRouter(),
115 "high_load": PerformanceBasedRouter(),
116 "default": CapabilityBasedRouter()
117 }
118
119 def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
120 # Determine system load
121 total_tasks = sum(m.tasks_completed for m in pool.metrics.values())
122 active_workers = len(pool.workers)
123
124 if active_workers == 0:
125 return None
126
127 load_ratio = total_tasks / active_workers if active_workers > 0 else 0
128
129 # Select strategy based on load
130 if load_ratio < 10:
131 strategy = self.strategies["low_load"]
132 elif load_ratio > 100:
133 strategy = self.strategies["high_load"]
134 else:
135 strategy = self.strategies["default"]
136
137 return strategy.select_worker(task, pool)| Strategy | Use Case | Trade-off |
|---|---|---|
| Capability-based | Simple matching | May overload specialized workers |
| Load-balanced | Even distribution | Ignores worker performance |
| Performance-based | Maximize throughput | May create hot spots |
| Priority | Critical task handling | Requires upfront configuration |
| Content-based | Specialized routing | Classifier complexity |
| Adaptive | Dynamic conditions | Implementation overhead |
State Management
Managing shared state between supervisor and workers requires careful coordination:
1from dataclasses import dataclass, field
2from typing import Any, Optional
3import asyncio
4from datetime import datetime
5import json
6
7
8@dataclass
9class StateEntry:
10 """Single entry in shared state."""
11 key: str
12 value: Any
13 version: int = 0
14 updated_at: datetime = field(default_factory=datetime.now)
15 updated_by: str = ""
16
17
18@dataclass
19class SharedStateManager:
20 """Manages shared state with versioning and conflict resolution."""
21
22 state: dict[str, StateEntry] = field(default_factory=dict)
23 locks: dict[str, asyncio.Lock] = field(default_factory=dict)
24 history: list[tuple[str, StateEntry]] = field(default_factory=list)
25
26 def _get_lock(self, key: str) -> asyncio.Lock:
27 """Get or create lock for key."""
28 if key not in self.locks:
29 self.locks[key] = asyncio.Lock()
30 return self.locks[key]
31
32 async def get(self, key: str) -> Optional[Any]:
33 """Get value for key."""
34 entry = self.state.get(key)
35 return entry.value if entry else None
36
37 async def get_with_version(self, key: str) -> Optional[tuple[Any, int]]:
38 """Get value and version for optimistic locking."""
39 entry = self.state.get(key)
40 return (entry.value, entry.version) if entry else None
41
42 async def set(
43 self,
44 key: str,
45 value: Any,
46 updater: str,
47 expected_version: Optional[int] = None
48 ) -> bool:
49 """
50 Set value with optional optimistic locking.
51 Returns True if successful, False on version conflict.
52 """
53 async with self._get_lock(key):
54 existing = self.state.get(key)
55
56 # Check version if optimistic locking
57 if expected_version is not None:
58 if existing and existing.version != expected_version:
59 return False # Version conflict
60
61 new_version = (existing.version + 1) if existing else 1
62 entry = StateEntry(
63 key=key,
64 value=value,
65 version=new_version,
66 updated_by=updater
67 )
68
69 self.state[key] = entry
70 self.history.append((key, entry))
71
72 return True
73
74 async def atomic_update(
75 self,
76 key: str,
77 update_fn: Callable[[Any], Any],
78 updater: str
79 ) -> Any:
80 """Atomically update a value."""
81 async with self._get_lock(key):
82 current = await self.get(key)
83 new_value = update_fn(current)
84 await self.set(key, new_value, updater)
85 return new_value
86
87 async def increment(self, key: str, updater: str, amount: int = 1) -> int:
88 """Atomically increment a counter."""
89 return await self.atomic_update(
90 key,
91 lambda x: (x or 0) + amount,
92 updater
93 )
94
95
96@dataclass
97class TaskStateCoordinator:
98 """Coordinates task state between supervisor and workers."""
99
100 shared_state: SharedStateManager = field(default_factory=SharedStateManager)
101
102 async def start_task(self, task_id: str, worker_name: str):
103 """Record task start."""
104 await self.shared_state.set(
105 f"task:[task_id]:status",
106 "in_progress",
107 worker_name
108 )
109 await self.shared_state.set(
110 f"task:[task_id]:started_at",
111 datetime.now().isoformat(),
112 worker_name
113 )
114
115 async def update_progress(
116 self,
117 task_id: str,
118 worker_name: str,
119 progress: float,
120 message: str = ""
121 ):
122 """Update task progress."""
123 await self.shared_state.set(
124 f"task:[task_id]:progress",
125 {"percent": progress, "message": message},
126 worker_name
127 )
128
129 async def complete_task(
130 self,
131 task_id: str,
132 worker_name: str,
133 result: dict
134 ):
135 """Record task completion."""
136 await self.shared_state.set(
137 f"task:[task_id]:status",
138 "completed",
139 worker_name
140 )
141 await self.shared_state.set(
142 f"task:[task_id]:result",
143 result,
144 worker_name
145 )
146 await self.shared_state.set(
147 f"task:[task_id]:completed_at",
148 datetime.now().isoformat(),
149 worker_name
150 )
151
152 async def fail_task(
153 self,
154 task_id: str,
155 worker_name: str,
156 error: str
157 ):
158 """Record task failure."""
159 await self.shared_state.set(
160 f"task:[task_id]:status",
161 "failed",
162 worker_name
163 )
164 await self.shared_state.set(
165 f"task:[task_id]:error",
166 error,
167 worker_name
168 )
169
170 async def get_task_state(self, task_id: str) -> dict:
171 """Get full task state."""
172 prefix = f"task:[task_id]:"
173 task_state = {}
174
175 for key, entry in self.shared_state.state.items():
176 if key.startswith(prefix):
177 field_name = key[len(prefix):]
178 task_state[field_name] = entry.value
179
180 return task_stateComplete Implementation
Here's a complete supervisor implementation combining all components:
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3import asyncio
4import uuid
5import time
6
7
8@dataclass
9class CompleteSupervisor:
10 """Full-featured supervisor implementation."""
11
12 config: SupervisorConfig = field(default_factory=SupervisorConfig)
13 pool: WorkerPool = field(default_factory=WorkerPool)
14 router: RoutingStrategy = field(default_factory=PerformanceBasedRouter)
15 state: TaskStateCoordinator = field(default_factory=TaskStateCoordinator)
16
17 # Queues
18 pending_tasks: asyncio.Queue = field(default_factory=asyncio.Queue)
19 results: dict[str, Task] = field(default_factory=dict)
20
21 # Control
22 running: bool = False
23 semaphore: Optional[asyncio.Semaphore] = None
24
25 def __post_init__(self):
26 self.semaphore = asyncio.Semaphore(self.config.max_concurrent_tasks)
27
28 def register_worker(self, worker: WorkerProtocol):
29 """Register a worker."""
30 self.pool.register(worker)
31
32 async def submit(self, task_type: str, payload: dict) -> str:
33 """Submit task and return task ID."""
34 task = Task(
35 id=str(uuid.uuid4()),
36 type=task_type,
37 payload=payload
38 )
39 await self.pending_tasks.put(task)
40 return task.id
41
42 async def start(self):
43 """Start the supervisor processing loop."""
44 self.running = True
45
46 # Start background tasks
47 asyncio.create_task(self._process_loop())
48
49 if self.config.enable_health_checks:
50 asyncio.create_task(self._health_check_loop())
51
52 async def stop(self):
53 """Stop the supervisor."""
54 self.running = False
55
56 async def _process_loop(self):
57 """Main processing loop."""
58 while self.running:
59 try:
60 # Get next task with timeout
61 task = await asyncio.wait_for(
62 self.pending_tasks.get(),
63 timeout=1.0
64 )
65
66 # Process with concurrency limit
67 asyncio.create_task(self._process_task(task))
68
69 except asyncio.TimeoutError:
70 continue
71 except Exception as e:
72 print(f"Process loop error: [e]")
73
74 async def _process_task(self, task: Task):
75 """Process a single task."""
76 async with self.semaphore:
77 start_time = time.time()
78
79 try:
80 # Route to worker
81 worker_name = self.router.select_worker(task, self.pool)
82
83 if not worker_name:
84 task.status = TaskStatus.FAILED
85 task.error = f"No worker available for task type: [task.type]"
86 self.results[task.id] = task
87 return
88
89 # Assign and track
90 task.assigned_to = worker_name
91 task.status = TaskStatus.IN_PROGRESS
92 await self.state.start_task(task.id, worker_name)
93
94 # Execute with timeout
95 worker = self.pool.workers[worker_name]
96 result = await asyncio.wait_for(
97 worker.execute(task),
98 timeout=self.config.task_timeout
99 )
100
101 # Success
102 task.result = result
103 task.status = TaskStatus.COMPLETED
104
105 execution_time = time.time() - start_time
106 self.pool.record_success(worker_name, execution_time)
107 await self.state.complete_task(task.id, worker_name, result)
108
109 except asyncio.TimeoutError:
110 task.status = TaskStatus.FAILED
111 task.error = "Task timed out"
112 if task.assigned_to:
113 self.pool.record_failure(task.assigned_to)
114 await self.state.fail_task(task.id, task.assigned_to, "Timeout")
115
116 except Exception as e:
117 task.status = TaskStatus.FAILED
118 task.error = str(e)
119 if task.assigned_to:
120 self.pool.record_failure(task.assigned_to)
121 await self.state.fail_task(task.id, task.assigned_to, str(e))
122
123 finally:
124 self.results[task.id] = task
125
126 async def _health_check_loop(self):
127 """Periodic health checks."""
128 while self.running:
129 await asyncio.sleep(self.config.health_check_interval)
130 await self.pool.health_check_all()
131
132 async def get_result(
133 self,
134 task_id: str,
135 timeout: float = 30.0
136 ) -> Optional[Task]:
137 """Wait for and return task result."""
138 start = time.time()
139
140 while time.time() - start < timeout:
141 if task_id in self.results:
142 return self.results[task_id]
143 await asyncio.sleep(0.1)
144
145 return None
146
147 def get_status(self) -> dict:
148 """Get supervisor status."""
149 return {
150 "running": self.running,
151 "workers": len(self.pool.workers),
152 "pending_tasks": self.pending_tasks.qsize(),
153 "completed_tasks": len(self.results),
154 "worker_metrics": {
155 name: {
156 "completed": m.tasks_completed,
157 "failed": m.tasks_failed,
158 "success_rate": m.success_rate,
159 "status": m.status.value
160 }
161 for name, m in self.pool.metrics.items()
162 }
163 }
164
165
166# Example usage
167async def supervisor_example():
168 # Create supervisor
169 supervisor = CompleteSupervisor(
170 config=SupervisorConfig(
171 max_concurrent_tasks=5,
172 task_timeout=30.0
173 ),
174 router=PerformanceBasedRouter()
175 )
176
177 # Register workers
178 supervisor.register_worker(CodeWriterWorker())
179 supervisor.register_worker(TestWriterWorker())
180
181 # Start processing
182 await supervisor.start()
183
184 # Submit tasks
185 task_ids = []
186 for i in range(10):
187 task_id = await supervisor.submit(
188 "write_code",
189 {"description": f"Function [i]"}
190 )
191 task_ids.append(task_id)
192
193 # Wait for results
194 for task_id in task_ids:
195 result = await supervisor.get_result(task_id)
196 print(f"Task [task_id]: [result.status.value if result else 'pending']")
197
198 # Check status
199 print(supervisor.get_status())
200
201 # Stop
202 await supervisor.stop()Key Takeaways
- Worker pools provide organized management with capability indexing and health monitoring.
- Routing strategies can optimize for different objectives - load balancing, performance, or specialization.
- State coordination enables progress tracking and result aggregation with proper concurrency control.
- Semaphores and timeouts prevent resource exhaustion and ensure task completion.
- Health checks enable proactive detection and handling of worker failures.
Next Section Preview: We'll explore peer-to-peer collaboration patterns where agents work as equals without central coordination.