Chapter 14
20 min read
Section 87 of 175

Supervisor Pattern

Multi-Agent Systems

Introduction

The supervisor pattern is the most common multi-agent architecture, featuring a central agent that coordinates worker agents. This section explores implementation details, worker management, routing strategies, and state handling for building robust supervisor-based systems.

Section Overview: We'll build a complete supervisor system from the ground up, covering worker registration, dynamic routing, and shared state management.

Supervisor Architecture

A well-designed supervisor architecture separates concerns between coordination logic and task execution:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Protocol, Callable, Awaitable
3from abc import abstractmethod
4from enum import Enum
5import asyncio
6
7
8class TaskStatus(Enum):
9    """Status of a task in the system."""
10    PENDING = "pending"
11    ASSIGNED = "assigned"
12    IN_PROGRESS = "in_progress"
13    COMPLETED = "completed"
14    FAILED = "failed"
15
16
17@dataclass
18class Task:
19    """Represents a task to be executed."""
20    id: str
21    type: str
22    payload: dict
23    status: TaskStatus = TaskStatus.PENDING
24    assigned_to: Optional[str] = None
25    result: Optional[dict] = None
26    error: Optional[str] = None
27
28
29class WorkerProtocol(Protocol):
30    """Protocol defining worker interface."""
31
32    @property
33    def name(self) -> str:
34        """Unique worker identifier."""
35        ...
36
37    @property
38    def capabilities(self) -> list[str]:
39        """Task types this worker can handle."""
40        ...
41
42    async def execute(self, task: Task) -> dict:
43        """Execute a task and return result."""
44        ...
45
46    async def health_check(self) -> bool:
47        """Check if worker is healthy."""
48        ...
49
50
51@dataclass
52class SupervisorConfig:
53    """Configuration for supervisor behavior."""
54    max_concurrent_tasks: int = 10
55    task_timeout: float = 300.0  # seconds
56    retry_limit: int = 3
57    enable_health_checks: bool = True
58    health_check_interval: float = 60.0
59
60
61@dataclass
62class Supervisor:
63    """Central coordinator for worker agents."""
64
65    config: SupervisorConfig = field(default_factory=SupervisorConfig)
66    workers: dict[str, WorkerProtocol] = field(default_factory=dict)
67    task_queue: list[Task] = field(default_factory=list)
68    active_tasks: dict[str, Task] = field(default_factory=dict)
69    completed_tasks: list[Task] = field(default_factory=list)
70
71    # Routing strategy
72    router: Optional[Callable[[Task, dict], str]] = None
73
74    def __post_init__(self):
75        if self.router is None:
76            self.router = self.default_router
77
78    def register_worker(self, worker: WorkerProtocol):
79        """Register a worker agent."""
80        self.workers[worker.name] = worker
81        print(f"Registered worker: [worker.name] with capabilities: [worker.capabilities]")
82
83    def unregister_worker(self, worker_name: str):
84        """Remove a worker from the pool."""
85        if worker_name in self.workers:
86            del self.workers[worker_name]
87            print(f"Unregistered worker: [worker_name]")
88
89    def default_router(self, task: Task, workers: dict) -> str:
90        """Default routing: find first capable worker."""
91        for name, worker in workers.items():
92            if task.type in worker.capabilities:
93                return name
94        raise ValueError(f"No worker capable of task type: [task.type]")
95
96    async def submit_task(self, task: Task) -> str:
97        """Submit a task for execution."""
98        self.task_queue.append(task)
99        return task.id
100
101    async def process_queue(self):
102        """Process pending tasks from queue."""
103        while self.task_queue:
104            if len(self.active_tasks) >= self.config.max_concurrent_tasks:
105                await asyncio.sleep(0.1)
106                continue
107
108            task = self.task_queue.pop(0)
109            await self.assign_and_execute(task)
110
111    async def assign_and_execute(self, task: Task):
112        """Assign task to worker and execute."""
113        try:
114            # Route to appropriate worker
115            worker_name = self.router(task, self.workers)
116            task.assigned_to = worker_name
117            task.status = TaskStatus.ASSIGNED
118
119            # Track as active
120            self.active_tasks[task.id] = task
121
122            # Execute with timeout
123            worker = self.workers[worker_name]
124            task.status = TaskStatus.IN_PROGRESS
125
126            result = await asyncio.wait_for(
127                worker.execute(task),
128                timeout=self.config.task_timeout
129            )
130
131            task.result = result
132            task.status = TaskStatus.COMPLETED
133
134        except asyncio.TimeoutError:
135            task.status = TaskStatus.FAILED
136            task.error = "Task timed out"
137
138        except Exception as e:
139            task.status = TaskStatus.FAILED
140            task.error = str(e)
141
142        finally:
143            # Move to completed
144            if task.id in self.active_tasks:
145                del self.active_tasks[task.id]
146            self.completed_tasks.append(task)
147
148    async def get_task_result(self, task_id: str) -> Optional[Task]:
149        """Get result of completed task."""
150        for task in self.completed_tasks:
151            if task.id == task_id:
152                return task
153        return None

Worker Management

Effective worker management includes registration, capability tracking, health monitoring, and dynamic scaling:

🐍python
1from dataclasses import dataclass, field
2from typing import Optional
3from enum import Enum
4import asyncio
5import time
6
7
8class WorkerStatus(Enum):
9    """Health status of a worker."""
10    HEALTHY = "healthy"
11    DEGRADED = "degraded"
12    UNHEALTHY = "unhealthy"
13    UNKNOWN = "unknown"
14
15
16@dataclass
17class WorkerMetrics:
18    """Metrics for worker performance."""
19    tasks_completed: int = 0
20    tasks_failed: int = 0
21    total_execution_time: float = 0.0
22    last_health_check: float = 0.0
23    status: WorkerStatus = WorkerStatus.UNKNOWN
24
25    @property
26    def success_rate(self) -> float:
27        total = self.tasks_completed + self.tasks_failed
28        return self.tasks_completed / total if total > 0 else 1.0
29
30    @property
31    def avg_execution_time(self) -> float:
32        if self.tasks_completed == 0:
33            return 0.0
34        return self.total_execution_time / self.tasks_completed
35
36
37@dataclass
38class WorkerPool:
39    """Manages a pool of worker agents."""
40
41    workers: dict[str, WorkerProtocol] = field(default_factory=dict)
42    metrics: dict[str, WorkerMetrics] = field(default_factory=dict)
43    capability_index: dict[str, list[str]] = field(default_factory=dict)
44
45    def register(self, worker: WorkerProtocol):
46        """Register worker and index capabilities."""
47        self.workers[worker.name] = worker
48        self.metrics[worker.name] = WorkerMetrics()
49
50        # Index by capability for fast lookup
51        for cap in worker.capabilities:
52            if cap not in self.capability_index:
53                self.capability_index[cap] = []
54            self.capability_index[cap].append(worker.name)
55
56    def unregister(self, worker_name: str):
57        """Remove worker and update indices."""
58        if worker_name not in self.workers:
59            return
60
61        worker = self.workers[worker_name]
62
63        # Remove from capability index
64        for cap in worker.capabilities:
65            if cap in self.capability_index:
66                self.capability_index[cap].remove(worker_name)
67
68        del self.workers[worker_name]
69        del self.metrics[worker_name]
70
71    def get_capable_workers(self, task_type: str) -> list[str]:
72        """Get all workers capable of handling task type."""
73        return self.capability_index.get(task_type, [])
74
75    def get_best_worker(self, task_type: str) -> Optional[str]:
76        """Get best worker for task based on metrics."""
77        capable = self.get_capable_workers(task_type)
78
79        if not capable:
80            return None
81
82        # Filter to healthy workers only
83        healthy = [
84            w for w in capable
85            if self.metrics[w].status in (WorkerStatus.HEALTHY, WorkerStatus.UNKNOWN)
86        ]
87
88        if not healthy:
89            # Fall back to any capable worker
90            healthy = capable
91
92        # Score workers by performance
93        def score(worker_name: str) -> float:
94            m = self.metrics[worker_name]
95            # Prefer high success rate and low execution time
96            return m.success_rate / (m.avg_execution_time + 0.1)
97
98        return max(healthy, key=score)
99
100    def record_success(self, worker_name: str, execution_time: float):
101        """Record successful task completion."""
102        if worker_name in self.metrics:
103            m = self.metrics[worker_name]
104            m.tasks_completed += 1
105            m.total_execution_time += execution_time
106
107    def record_failure(self, worker_name: str):
108        """Record task failure."""
109        if worker_name in self.metrics:
110            self.metrics[worker_name].tasks_failed += 1
111
112    async def health_check_all(self):
113        """Run health checks on all workers."""
114        for name, worker in self.workers.items():
115            try:
116                is_healthy = await worker.health_check()
117                self.metrics[name].status = (
118                    WorkerStatus.HEALTHY if is_healthy else WorkerStatus.UNHEALTHY
119                )
120            except Exception:
121                self.metrics[name].status = WorkerStatus.UNHEALTHY
122
123            self.metrics[name].last_health_check = time.time()
124
125
126@dataclass
127class DynamicWorkerManager:
128    """Manages workers with auto-scaling."""
129
130    pool: WorkerPool = field(default_factory=WorkerPool)
131    worker_factory: Optional[Callable[[str], WorkerProtocol]] = None
132    min_workers_per_capability: int = 1
133    max_workers_per_capability: int = 5
134
135    async def ensure_capacity(self, task_type: str, queue_depth: int):
136        """Scale workers based on demand."""
137        current = len(self.pool.get_capable_workers(task_type))
138
139        # Scale up if queue is deep
140        if queue_depth > current * 2 and current < self.max_workers_per_capability:
141            if self.worker_factory:
142                new_worker = self.worker_factory(task_type)
143                self.pool.register(new_worker)
144                print(f"Scaled up: added worker for [task_type]")
145
146        # Scale down if workers are idle
147        elif queue_depth == 0 and current > self.min_workers_per_capability:
148            # Remove least performing worker
149            workers = self.pool.get_capable_workers(task_type)
150            metrics = [(w, self.pool.metrics[w].success_rate) for w in workers]
151            worst = min(metrics, key=lambda x: x[1])[0]
152            self.pool.unregister(worst)
153            print(f"Scaled down: removed worker [worst]")

Task Routing Strategies

Different routing strategies optimize for various objectives like load balancing, specialization, or latency:

🐍python
1from dataclasses import dataclass
2from typing import Callable, Optional
3from abc import ABC, abstractmethod
4import random
5
6
7class RoutingStrategy(ABC):
8    """Base class for routing strategies."""
9
10    @abstractmethod
11    def select_worker(
12        self,
13        task: Task,
14        pool: WorkerPool
15    ) -> Optional[str]:
16        """Select a worker for the given task."""
17        pass
18
19
20class CapabilityBasedRouter(RoutingStrategy):
21    """Routes based on worker capabilities."""
22
23    def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
24        capable = pool.get_capable_workers(task.type)
25        return capable[0] if capable else None
26
27
28class LoadBalancedRouter(RoutingStrategy):
29    """Distributes tasks evenly across workers."""
30
31    def __init__(self):
32        self.task_counts: dict[str, int] = {}
33
34    def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
35        capable = pool.get_capable_workers(task.type)
36        if not capable:
37            return None
38
39        # Initialize counts
40        for w in capable:
41            if w not in self.task_counts:
42                self.task_counts[w] = 0
43
44        # Select worker with least tasks
45        selected = min(capable, key=lambda w: self.task_counts[w])
46        self.task_counts[selected] += 1
47
48        return selected
49
50
51class PerformanceBasedRouter(RoutingStrategy):
52    """Routes to best performing worker."""
53
54    def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
55        return pool.get_best_worker(task.type)
56
57
58class RandomRouter(RoutingStrategy):
59    """Randomly selects from capable workers."""
60
61    def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
62        capable = pool.get_capable_workers(task.type)
63        return random.choice(capable) if capable else None
64
65
66class PriorityRouter(RoutingStrategy):
67    """Routes based on task priority and worker specialization."""
68
69    def __init__(self, priority_workers: dict[str, list[str]]):
70        # Maps task types to preferred workers
71        self.priority_workers = priority_workers
72
73    def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
74        # Check if there are priority workers for this task type
75        if task.type in self.priority_workers:
76            for preferred in self.priority_workers[task.type]:
77                if preferred in pool.workers:
78                    metrics = pool.metrics[preferred]
79                    if metrics.status == WorkerStatus.HEALTHY:
80                        return preferred
81
82        # Fall back to capability-based routing
83        return pool.get_best_worker(task.type)
84
85
86class ContentBasedRouter(RoutingStrategy):
87    """Routes based on task content analysis."""
88
89    def __init__(self, classifier: Callable[[dict], str]):
90        self.classifier = classifier
91
92    def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
93        # Analyze task content to determine specialized type
94        specialized_type = self.classifier(task.payload)
95
96        # Find worker specialized in this content type
97        for name, worker in pool.workers.items():
98            if specialized_type in worker.capabilities:
99                return name
100
101        # Fall back to general capable worker
102        return pool.get_best_worker(task.type)
103
104
105@dataclass
106class AdaptiveRouter(RoutingStrategy):
107    """Adapts routing strategy based on system conditions."""
108
109    strategies: dict[str, RoutingStrategy] = field(default_factory=dict)
110
111    def __post_init__(self):
112        if not self.strategies:
113            self.strategies = {
114                "low_load": LoadBalancedRouter(),
115                "high_load": PerformanceBasedRouter(),
116                "default": CapabilityBasedRouter()
117            }
118
119    def select_worker(self, task: Task, pool: WorkerPool) -> Optional[str]:
120        # Determine system load
121        total_tasks = sum(m.tasks_completed for m in pool.metrics.values())
122        active_workers = len(pool.workers)
123
124        if active_workers == 0:
125            return None
126
127        load_ratio = total_tasks / active_workers if active_workers > 0 else 0
128
129        # Select strategy based on load
130        if load_ratio < 10:
131            strategy = self.strategies["low_load"]
132        elif load_ratio > 100:
133            strategy = self.strategies["high_load"]
134        else:
135            strategy = self.strategies["default"]
136
137        return strategy.select_worker(task, pool)
StrategyUse CaseTrade-off
Capability-basedSimple matchingMay overload specialized workers
Load-balancedEven distributionIgnores worker performance
Performance-basedMaximize throughputMay create hot spots
PriorityCritical task handlingRequires upfront configuration
Content-basedSpecialized routingClassifier complexity
AdaptiveDynamic conditionsImplementation overhead

State Management

Managing shared state between supervisor and workers requires careful coordination:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3import asyncio
4from datetime import datetime
5import json
6
7
8@dataclass
9class StateEntry:
10    """Single entry in shared state."""
11    key: str
12    value: Any
13    version: int = 0
14    updated_at: datetime = field(default_factory=datetime.now)
15    updated_by: str = ""
16
17
18@dataclass
19class SharedStateManager:
20    """Manages shared state with versioning and conflict resolution."""
21
22    state: dict[str, StateEntry] = field(default_factory=dict)
23    locks: dict[str, asyncio.Lock] = field(default_factory=dict)
24    history: list[tuple[str, StateEntry]] = field(default_factory=list)
25
26    def _get_lock(self, key: str) -> asyncio.Lock:
27        """Get or create lock for key."""
28        if key not in self.locks:
29            self.locks[key] = asyncio.Lock()
30        return self.locks[key]
31
32    async def get(self, key: str) -> Optional[Any]:
33        """Get value for key."""
34        entry = self.state.get(key)
35        return entry.value if entry else None
36
37    async def get_with_version(self, key: str) -> Optional[tuple[Any, int]]:
38        """Get value and version for optimistic locking."""
39        entry = self.state.get(key)
40        return (entry.value, entry.version) if entry else None
41
42    async def set(
43        self,
44        key: str,
45        value: Any,
46        updater: str,
47        expected_version: Optional[int] = None
48    ) -> bool:
49        """
50        Set value with optional optimistic locking.
51        Returns True if successful, False on version conflict.
52        """
53        async with self._get_lock(key):
54            existing = self.state.get(key)
55
56            # Check version if optimistic locking
57            if expected_version is not None:
58                if existing and existing.version != expected_version:
59                    return False  # Version conflict
60
61            new_version = (existing.version + 1) if existing else 1
62            entry = StateEntry(
63                key=key,
64                value=value,
65                version=new_version,
66                updated_by=updater
67            )
68
69            self.state[key] = entry
70            self.history.append((key, entry))
71
72            return True
73
74    async def atomic_update(
75        self,
76        key: str,
77        update_fn: Callable[[Any], Any],
78        updater: str
79    ) -> Any:
80        """Atomically update a value."""
81        async with self._get_lock(key):
82            current = await self.get(key)
83            new_value = update_fn(current)
84            await self.set(key, new_value, updater)
85            return new_value
86
87    async def increment(self, key: str, updater: str, amount: int = 1) -> int:
88        """Atomically increment a counter."""
89        return await self.atomic_update(
90            key,
91            lambda x: (x or 0) + amount,
92            updater
93        )
94
95
96@dataclass
97class TaskStateCoordinator:
98    """Coordinates task state between supervisor and workers."""
99
100    shared_state: SharedStateManager = field(default_factory=SharedStateManager)
101
102    async def start_task(self, task_id: str, worker_name: str):
103        """Record task start."""
104        await self.shared_state.set(
105            f"task:[task_id]:status",
106            "in_progress",
107            worker_name
108        )
109        await self.shared_state.set(
110            f"task:[task_id]:started_at",
111            datetime.now().isoformat(),
112            worker_name
113        )
114
115    async def update_progress(
116        self,
117        task_id: str,
118        worker_name: str,
119        progress: float,
120        message: str = ""
121    ):
122        """Update task progress."""
123        await self.shared_state.set(
124            f"task:[task_id]:progress",
125            {"percent": progress, "message": message},
126            worker_name
127        )
128
129    async def complete_task(
130        self,
131        task_id: str,
132        worker_name: str,
133        result: dict
134    ):
135        """Record task completion."""
136        await self.shared_state.set(
137            f"task:[task_id]:status",
138            "completed",
139            worker_name
140        )
141        await self.shared_state.set(
142            f"task:[task_id]:result",
143            result,
144            worker_name
145        )
146        await self.shared_state.set(
147            f"task:[task_id]:completed_at",
148            datetime.now().isoformat(),
149            worker_name
150        )
151
152    async def fail_task(
153        self,
154        task_id: str,
155        worker_name: str,
156        error: str
157    ):
158        """Record task failure."""
159        await self.shared_state.set(
160            f"task:[task_id]:status",
161            "failed",
162            worker_name
163        )
164        await self.shared_state.set(
165            f"task:[task_id]:error",
166            error,
167            worker_name
168        )
169
170    async def get_task_state(self, task_id: str) -> dict:
171        """Get full task state."""
172        prefix = f"task:[task_id]:"
173        task_state = {}
174
175        for key, entry in self.shared_state.state.items():
176            if key.startswith(prefix):
177                field_name = key[len(prefix):]
178                task_state[field_name] = entry.value
179
180        return task_state

Complete Implementation

Here's a complete supervisor implementation combining all components:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3import asyncio
4import uuid
5import time
6
7
8@dataclass
9class CompleteSupervisor:
10    """Full-featured supervisor implementation."""
11
12    config: SupervisorConfig = field(default_factory=SupervisorConfig)
13    pool: WorkerPool = field(default_factory=WorkerPool)
14    router: RoutingStrategy = field(default_factory=PerformanceBasedRouter)
15    state: TaskStateCoordinator = field(default_factory=TaskStateCoordinator)
16
17    # Queues
18    pending_tasks: asyncio.Queue = field(default_factory=asyncio.Queue)
19    results: dict[str, Task] = field(default_factory=dict)
20
21    # Control
22    running: bool = False
23    semaphore: Optional[asyncio.Semaphore] = None
24
25    def __post_init__(self):
26        self.semaphore = asyncio.Semaphore(self.config.max_concurrent_tasks)
27
28    def register_worker(self, worker: WorkerProtocol):
29        """Register a worker."""
30        self.pool.register(worker)
31
32    async def submit(self, task_type: str, payload: dict) -> str:
33        """Submit task and return task ID."""
34        task = Task(
35            id=str(uuid.uuid4()),
36            type=task_type,
37            payload=payload
38        )
39        await self.pending_tasks.put(task)
40        return task.id
41
42    async def start(self):
43        """Start the supervisor processing loop."""
44        self.running = True
45
46        # Start background tasks
47        asyncio.create_task(self._process_loop())
48
49        if self.config.enable_health_checks:
50            asyncio.create_task(self._health_check_loop())
51
52    async def stop(self):
53        """Stop the supervisor."""
54        self.running = False
55
56    async def _process_loop(self):
57        """Main processing loop."""
58        while self.running:
59            try:
60                # Get next task with timeout
61                task = await asyncio.wait_for(
62                    self.pending_tasks.get(),
63                    timeout=1.0
64                )
65
66                # Process with concurrency limit
67                asyncio.create_task(self._process_task(task))
68
69            except asyncio.TimeoutError:
70                continue
71            except Exception as e:
72                print(f"Process loop error: [e]")
73
74    async def _process_task(self, task: Task):
75        """Process a single task."""
76        async with self.semaphore:
77            start_time = time.time()
78
79            try:
80                # Route to worker
81                worker_name = self.router.select_worker(task, self.pool)
82
83                if not worker_name:
84                    task.status = TaskStatus.FAILED
85                    task.error = f"No worker available for task type: [task.type]"
86                    self.results[task.id] = task
87                    return
88
89                # Assign and track
90                task.assigned_to = worker_name
91                task.status = TaskStatus.IN_PROGRESS
92                await self.state.start_task(task.id, worker_name)
93
94                # Execute with timeout
95                worker = self.pool.workers[worker_name]
96                result = await asyncio.wait_for(
97                    worker.execute(task),
98                    timeout=self.config.task_timeout
99                )
100
101                # Success
102                task.result = result
103                task.status = TaskStatus.COMPLETED
104
105                execution_time = time.time() - start_time
106                self.pool.record_success(worker_name, execution_time)
107                await self.state.complete_task(task.id, worker_name, result)
108
109            except asyncio.TimeoutError:
110                task.status = TaskStatus.FAILED
111                task.error = "Task timed out"
112                if task.assigned_to:
113                    self.pool.record_failure(task.assigned_to)
114                    await self.state.fail_task(task.id, task.assigned_to, "Timeout")
115
116            except Exception as e:
117                task.status = TaskStatus.FAILED
118                task.error = str(e)
119                if task.assigned_to:
120                    self.pool.record_failure(task.assigned_to)
121                    await self.state.fail_task(task.id, task.assigned_to, str(e))
122
123            finally:
124                self.results[task.id] = task
125
126    async def _health_check_loop(self):
127        """Periodic health checks."""
128        while self.running:
129            await asyncio.sleep(self.config.health_check_interval)
130            await self.pool.health_check_all()
131
132    async def get_result(
133        self,
134        task_id: str,
135        timeout: float = 30.0
136    ) -> Optional[Task]:
137        """Wait for and return task result."""
138        start = time.time()
139
140        while time.time() - start < timeout:
141            if task_id in self.results:
142                return self.results[task_id]
143            await asyncio.sleep(0.1)
144
145        return None
146
147    def get_status(self) -> dict:
148        """Get supervisor status."""
149        return {
150            "running": self.running,
151            "workers": len(self.pool.workers),
152            "pending_tasks": self.pending_tasks.qsize(),
153            "completed_tasks": len(self.results),
154            "worker_metrics": {
155                name: {
156                    "completed": m.tasks_completed,
157                    "failed": m.tasks_failed,
158                    "success_rate": m.success_rate,
159                    "status": m.status.value
160                }
161                for name, m in self.pool.metrics.items()
162            }
163        }
164
165
166# Example usage
167async def supervisor_example():
168    # Create supervisor
169    supervisor = CompleteSupervisor(
170        config=SupervisorConfig(
171            max_concurrent_tasks=5,
172            task_timeout=30.0
173        ),
174        router=PerformanceBasedRouter()
175    )
176
177    # Register workers
178    supervisor.register_worker(CodeWriterWorker())
179    supervisor.register_worker(TestWriterWorker())
180
181    # Start processing
182    await supervisor.start()
183
184    # Submit tasks
185    task_ids = []
186    for i in range(10):
187        task_id = await supervisor.submit(
188            "write_code",
189            {"description": f"Function [i]"}
190        )
191        task_ids.append(task_id)
192
193    # Wait for results
194    for task_id in task_ids:
195        result = await supervisor.get_result(task_id)
196        print(f"Task [task_id]: [result.status.value if result else 'pending']")
197
198    # Check status
199    print(supervisor.get_status())
200
201    # Stop
202    await supervisor.stop()

Key Takeaways

  • Worker pools provide organized management with capability indexing and health monitoring.
  • Routing strategies can optimize for different objectives - load balancing, performance, or specialization.
  • State coordination enables progress tracking and result aggregation with proper concurrency control.
  • Semaphores and timeouts prevent resource exhaustion and ensure task completion.
  • Health checks enable proactive detection and handling of worker failures.
Next Section Preview: We'll explore peer-to-peer collaboration patterns where agents work as equals without central coordination.