Introduction
Regardless of whether you use supervisor, peer-to-peer, or hierarchical patterns, multi-agent systems require shared state and reliable communication. This section explores state management strategies, communication patterns, and synchronization mechanisms that enable effective agent coordination.
Section Overview: We'll build shared state systems with consistency guarantees, implement various communication patterns, and create event-driven coordination mechanisms.
Shared State Management
Agents need consistent access to shared state while avoiding conflicts and maintaining data integrity:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3from abc import ABC, abstractmethod
4import asyncio
5from datetime import datetime
6import json
7import hashlib
8
9
10@dataclass
11class StateVersion:
12 """Versioned state entry."""
13 value: Any
14 version: int
15 timestamp: datetime
16 updated_by: str
17 checksum: str
18
19 @classmethod
20 def create(cls, value: Any, version: int, updated_by: str) -> 'StateVersion':
21 checksum = hashlib.md5(
22 json.dumps(value, sort_keys=True, default=str).encode()
23 ).hexdigest()
24 return cls(
25 value=value,
26 version=version,
27 timestamp=datetime.now(),
28 updated_by=updated_by,
29 checksum=checksum
30 )
31
32
33class StateStore(ABC):
34 """Abstract state store interface."""
35
36 @abstractmethod
37 async def get(self, key: str) -> Optional[StateVersion]:
38 pass
39
40 @abstractmethod
41 async def set(
42 self,
43 key: str,
44 value: Any,
45 updater: str,
46 expected_version: Optional[int] = None
47 ) -> bool:
48 pass
49
50 @abstractmethod
51 async def delete(self, key: str) -> bool:
52 pass
53
54
55@dataclass
56class InMemoryStateStore(StateStore):
57 """In-memory state store with versioning."""
58
59 data: dict[str, StateVersion] = field(default_factory=dict)
60 locks: dict[str, asyncio.Lock] = field(default_factory=dict)
61 history: list[tuple[str, str, StateVersion]] = field(default_factory=list)
62
63 def _get_lock(self, key: str) -> asyncio.Lock:
64 if key not in self.locks:
65 self.locks[key] = asyncio.Lock()
66 return self.locks[key]
67
68 async def get(self, key: str) -> Optional[StateVersion]:
69 return self.data.get(key)
70
71 async def set(
72 self,
73 key: str,
74 value: Any,
75 updater: str,
76 expected_version: Optional[int] = None
77 ) -> bool:
78 async with self._get_lock(key):
79 current = self.data.get(key)
80
81 # Optimistic locking check
82 if expected_version is not None:
83 if current and current.version != expected_version:
84 return False # Version conflict
85
86 new_version = (current.version + 1) if current else 1
87 entry = StateVersion.create(value, new_version, updater)
88
89 self.data[key] = entry
90 self.history.append((key, "set", entry))
91
92 return True
93
94 async def delete(self, key: str) -> bool:
95 async with self._get_lock(key):
96 if key in self.data:
97 del self.data[key]
98 return True
99 return False
100
101 async def compare_and_swap(
102 self,
103 key: str,
104 expected_value: Any,
105 new_value: Any,
106 updater: str
107 ) -> bool:
108 """Atomic compare-and-swap operation."""
109 async with self._get_lock(key):
110 current = self.data.get(key)
111 current_value = current.value if current else None
112
113 if current_value != expected_value:
114 return False
115
116 return await self.set(key, new_value, updater)
117
118
119@dataclass
120class DistributedState:
121 """Distributed state with conflict resolution."""
122
123 stores: dict[str, StateStore] = field(default_factory=dict)
124 conflict_resolver: Callable[[list[StateVersion]], StateVersion] = None
125
126 def __post_init__(self):
127 if self.conflict_resolver is None:
128 self.conflict_resolver = self._last_write_wins
129
130 def _last_write_wins(self, versions: list[StateVersion]) -> StateVersion:
131 """Resolve conflicts using last-write-wins."""
132 return max(versions, key=lambda v: v.timestamp)
133
134 async def get(self, key: str) -> Optional[Any]:
135 """Get value from any available store."""
136 for store in self.stores.values():
137 result = await store.get(key)
138 if result:
139 return result.value
140 return None
141
142 async def set_replicated(
143 self,
144 key: str,
145 value: Any,
146 updater: str,
147 quorum: int = 1
148 ) -> bool:
149 """Set value with replication to multiple stores."""
150 successes = 0
151
152 for store in self.stores.values():
153 if await store.set(key, value, updater):
154 successes += 1
155
156 return successes >= quorum
157
158 async def sync_stores(self, key: str):
159 """Synchronize a key across all stores."""
160 versions = []
161
162 for store in self.stores.values():
163 version = await store.get(key)
164 if version:
165 versions.append(version)
166
167 if not versions:
168 return
169
170 # Resolve conflicts
171 winner = self.conflict_resolver(versions)
172
173 # Update all stores to winning version
174 for store in self.stores.values():
175 await store.set(
176 key,
177 winner.value,
178 "sync",
179 expected_version=None # Force update
180 )State Namespacing
🐍python
1@dataclass
2class NamespacedState:
3 """State store with namespace isolation."""
4
5 store: StateStore
6 namespaces: set[str] = field(default_factory=set)
7
8 def _namespaced_key(self, namespace: str, key: str) -> str:
9 return f"[namespace]:[key]"
10
11 def create_namespace(self, namespace: str):
12 """Create a new namespace."""
13 self.namespaces.add(namespace)
14
15 async def get(self, namespace: str, key: str) -> Optional[Any]:
16 """Get value from namespace."""
17 if namespace not in self.namespaces:
18 raise ValueError(f"Unknown namespace: [namespace]")
19
20 result = await self.store.get(self._namespaced_key(namespace, key))
21 return result.value if result else None
22
23 async def set(
24 self,
25 namespace: str,
26 key: str,
27 value: Any,
28 updater: str
29 ) -> bool:
30 """Set value in namespace."""
31 if namespace not in self.namespaces:
32 raise ValueError(f"Unknown namespace: [namespace]")
33
34 return await self.store.set(
35 self._namespaced_key(namespace, key),
36 value,
37 updater
38 )
39
40 async def list_keys(self, namespace: str) -> list[str]:
41 """List all keys in namespace."""
42 prefix = f"[namespace]:"
43 keys = []
44
45 if isinstance(self.store, InMemoryStateStore):
46 for key in self.store.data:
47 if key.startswith(prefix):
48 keys.append(key[len(prefix):])
49
50 return keys
51
52
53# Usage example
54async def state_example():
55 store = InMemoryStateStore()
56 namespaced = NamespacedState(store=store)
57
58 # Create namespaces for different agent groups
59 namespaced.create_namespace("workers")
60 namespaced.create_namespace("supervisors")
61 namespaced.create_namespace("shared")
62
63 # Workers store their progress
64 await namespaced.set("workers", "task_1_progress", 0.5, "worker_1")
65
66 # Supervisors store coordination data
67 await namespaced.set("supervisors", "active_workers", ["w1", "w2"], "supervisor")
68
69 # Shared state for all agents
70 await namespaced.set("shared", "global_config", {"timeout": 30}, "admin")Communication Patterns
Different communication patterns suit different coordination needs:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Callable, Awaitable
3from enum import Enum
4import asyncio
5
6
7class MessagePattern(Enum):
8 """Communication patterns."""
9 REQUEST_REPLY = "request_reply"
10 PUBLISH_SUBSCRIBE = "publish_subscribe"
11 FIRE_AND_FORGET = "fire_and_forget"
12 STREAMING = "streaming"
13
14
15@dataclass
16class Message:
17 """Generic message structure."""
18 id: str
19 sender: str
20 pattern: MessagePattern
21 topic: Optional[str] = None
22 payload: dict = field(default_factory=dict)
23 correlation_id: Optional[str] = None
24
25
26@dataclass
27class RequestReplyChannel:
28 """Channel for request-reply communication."""
29
30 pending_requests: dict[str, asyncio.Future] = field(default_factory=dict)
31 handlers: dict[str, Callable[[Message], Awaitable[Any]]] = field(
32 default_factory=dict
33 )
34
35 def register_handler(
36 self,
37 agent_id: str,
38 handler: Callable[[Message], Awaitable[Any]]
39 ):
40 """Register a handler for requests to this agent."""
41 self.handlers[agent_id] = handler
42
43 async def request(
44 self,
45 sender: str,
46 recipient: str,
47 payload: dict,
48 timeout: float = 30.0
49 ) -> Any:
50 """Send request and wait for reply."""
51 request_id = str(uuid.uuid4())
52 message = Message(
53 id=request_id,
54 sender=sender,
55 pattern=MessagePattern.REQUEST_REPLY,
56 payload=payload
57 )
58
59 # Create future for response
60 future = asyncio.Future()
61 self.pending_requests[request_id] = future
62
63 try:
64 # Dispatch to handler
65 if recipient in self.handlers:
66 asyncio.create_task(self._handle_request(recipient, message))
67
68 # Wait for response
69 return await asyncio.wait_for(future, timeout=timeout)
70
71 finally:
72 del self.pending_requests[request_id]
73
74 async def _handle_request(self, recipient: str, message: Message):
75 """Handle request and send response."""
76 handler = self.handlers[recipient]
77 try:
78 response = await handler(message)
79 if message.id in self.pending_requests:
80 self.pending_requests[message.id].set_result(response)
81 except Exception as e:
82 if message.id in self.pending_requests:
83 self.pending_requests[message.id].set_exception(e)
84
85
86@dataclass
87class PubSubChannel:
88 """Publish-subscribe channel."""
89
90 subscribers: dict[str, list[Callable[[Message], Awaitable[None]]]] = field(
91 default_factory=dict
92 )
93
94 def subscribe(
95 self,
96 topic: str,
97 handler: Callable[[Message], Awaitable[None]]
98 ):
99 """Subscribe to a topic."""
100 if topic not in self.subscribers:
101 self.subscribers[topic] = []
102 self.subscribers[topic].append(handler)
103
104 def unsubscribe(
105 self,
106 topic: str,
107 handler: Callable[[Message], Awaitable[None]]
108 ):
109 """Unsubscribe from a topic."""
110 if topic in self.subscribers:
111 self.subscribers[topic].remove(handler)
112
113 async def publish(self, sender: str, topic: str, payload: dict):
114 """Publish message to topic."""
115 message = Message(
116 id=str(uuid.uuid4()),
117 sender=sender,
118 pattern=MessagePattern.PUBLISH_SUBSCRIBE,
119 topic=topic,
120 payload=payload
121 )
122
123 if topic in self.subscribers:
124 await asyncio.gather(*[
125 handler(message)
126 for handler in self.subscribers[topic]
127 ])
128
129
130@dataclass
131class StreamingChannel:
132 """Channel for streaming data."""
133
134 streams: dict[str, asyncio.Queue] = field(default_factory=dict)
135
136 def create_stream(self, stream_id: str) -> asyncio.Queue:
137 """Create a new stream."""
138 self.streams[stream_id] = asyncio.Queue()
139 return self.streams[stream_id]
140
141 async def send(self, stream_id: str, data: Any):
142 """Send data to stream."""
143 if stream_id in self.streams:
144 await self.streams[stream_id].put(data)
145
146 async def receive(
147 self,
148 stream_id: str,
149 timeout: float = None
150 ) -> Optional[Any]:
151 """Receive data from stream."""
152 if stream_id not in self.streams:
153 return None
154
155 try:
156 if timeout:
157 return await asyncio.wait_for(
158 self.streams[stream_id].get(),
159 timeout=timeout
160 )
161 return await self.streams[stream_id].get()
162 except asyncio.TimeoutError:
163 return None
164
165 def close_stream(self, stream_id: str):
166 """Close a stream."""
167 if stream_id in self.streams:
168 del self.streams[stream_id]
169
170
171@dataclass
172class UnifiedCommunicationHub:
173 """Unified hub for all communication patterns."""
174
175 request_reply: RequestReplyChannel = field(default_factory=RequestReplyChannel)
176 pub_sub: PubSubChannel = field(default_factory=PubSubChannel)
177 streaming: StreamingChannel = field(default_factory=StreamingChannel)| Pattern | Use Case | Characteristics |
|---|---|---|
| Request-Reply | Task delegation, queries | Synchronous, bidirectional |
| Pub-Sub | Events, notifications | Async, decoupled, many-to-many |
| Fire-and-Forget | Logging, metrics | Async, no confirmation |
| Streaming | Progress updates, data feeds | Continuous, ordered |
Synchronization Mechanisms
When agents need to coordinate timing or access shared resources, synchronization primitives ensure consistency:
🐍python
1from dataclasses import dataclass, field
2from typing import Set
3import asyncio
4
5
6@dataclass
7class DistributedLock:
8 """Distributed lock for exclusive access."""
9
10 lock_id: str
11 holder: Optional[str] = None
12 waiters: list[asyncio.Future] = field(default_factory=list)
13 _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
14
15 async def acquire(self, agent_id: str, timeout: float = 30.0) -> bool:
16 """Acquire the lock."""
17 async with self._lock:
18 if self.holder is None:
19 self.holder = agent_id
20 return True
21
22 # Add to waiters
23 future = asyncio.Future()
24 self.waiters.append((agent_id, future))
25
26 # Wait for lock
27 try:
28 await asyncio.wait_for(future, timeout=timeout)
29 return True
30 except asyncio.TimeoutError:
31 # Remove from waiters
32 self.waiters = [(a, f) for a, f in self.waiters if a != agent_id]
33 return False
34
35 async def release(self, agent_id: str):
36 """Release the lock."""
37 async with self._lock:
38 if self.holder != agent_id:
39 return # Not the holder
40
41 self.holder = None
42
43 # Wake next waiter
44 if self.waiters:
45 next_agent, future = self.waiters.pop(0)
46 self.holder = next_agent
47 future.set_result(True)
48
49
50@dataclass
51class Barrier:
52 """Synchronization barrier for multiple agents."""
53
54 required_count: int
55 arrived: Set[str] = field(default_factory=set)
56 event: asyncio.Event = field(default_factory=asyncio.Event)
57 _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
58
59 async def wait(self, agent_id: str, timeout: float = 60.0) -> bool:
60 """Wait at barrier until all agents arrive."""
61 async with self._lock:
62 self.arrived.add(agent_id)
63
64 if len(self.arrived) >= self.required_count:
65 self.event.set()
66 return True
67
68 # Wait for all to arrive
69 try:
70 await asyncio.wait_for(self.event.wait(), timeout=timeout)
71 return True
72 except asyncio.TimeoutError:
73 return False
74
75 def reset(self):
76 """Reset the barrier."""
77 self.arrived.clear()
78 self.event.clear()
79
80
81@dataclass
82class Semaphore:
83 """Distributed semaphore for limited concurrent access."""
84
85 max_permits: int
86 current_permits: int = 0
87 holders: Set[str] = field(default_factory=set)
88 waiters: list[tuple[str, asyncio.Future]] = field(default_factory=list)
89 _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
90
91 def __post_init__(self):
92 self.current_permits = self.max_permits
93
94 async def acquire(self, agent_id: str, timeout: float = 30.0) -> bool:
95 """Acquire a permit."""
96 async with self._lock:
97 if self.current_permits > 0:
98 self.current_permits -= 1
99 self.holders.add(agent_id)
100 return True
101
102 future = asyncio.Future()
103 self.waiters.append((agent_id, future))
104
105 try:
106 await asyncio.wait_for(future, timeout=timeout)
107 return True
108 except asyncio.TimeoutError:
109 self.waiters = [(a, f) for a, f in self.waiters if a != agent_id]
110 return False
111
112 async def release(self, agent_id: str):
113 """Release a permit."""
114 async with self._lock:
115 if agent_id not in self.holders:
116 return
117
118 self.holders.remove(agent_id)
119
120 if self.waiters:
121 next_agent, future = self.waiters.pop(0)
122 self.holders.add(next_agent)
123 future.set_result(True)
124 else:
125 self.current_permits += 1
126
127
128@dataclass
129class SynchronizationManager:
130 """Manages all synchronization primitives."""
131
132 locks: dict[str, DistributedLock] = field(default_factory=dict)
133 barriers: dict[str, Barrier] = field(default_factory=dict)
134 semaphores: dict[str, Semaphore] = field(default_factory=dict)
135
136 def get_lock(self, lock_id: str) -> DistributedLock:
137 """Get or create a lock."""
138 if lock_id not in self.locks:
139 self.locks[lock_id] = DistributedLock(lock_id=lock_id)
140 return self.locks[lock_id]
141
142 def get_barrier(self, barrier_id: str, count: int) -> Barrier:
143 """Get or create a barrier."""
144 if barrier_id not in self.barriers:
145 self.barriers[barrier_id] = Barrier(required_count=count)
146 return self.barriers[barrier_id]
147
148 def get_semaphore(self, sem_id: str, permits: int) -> Semaphore:
149 """Get or create a semaphore."""
150 if sem_id not in self.semaphores:
151 self.semaphores[sem_id] = Semaphore(max_permits=permits)
152 return self.semaphores[sem_id]Event-Driven Coordination
Event-driven architectures enable loose coupling and reactive coordination:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Callable, Awaitable
3from enum import Enum
4import asyncio
5from datetime import datetime
6
7
8class EventType(Enum):
9 """Standard event types."""
10 TASK_CREATED = "task_created"
11 TASK_STARTED = "task_started"
12 TASK_COMPLETED = "task_completed"
13 TASK_FAILED = "task_failed"
14 AGENT_JOINED = "agent_joined"
15 AGENT_LEFT = "agent_left"
16 STATE_CHANGED = "state_changed"
17 ERROR_OCCURRED = "error_occurred"
18
19
20@dataclass
21class Event:
22 """Domain event."""
23 id: str
24 type: EventType
25 source: str
26 timestamp: datetime = field(default_factory=datetime.now)
27 payload: dict = field(default_factory=dict)
28 metadata: dict = field(default_factory=dict)
29
30
31EventHandler = Callable[[Event], Awaitable[None]]
32
33
34@dataclass
35class EventBus:
36 """Central event bus for the multi-agent system."""
37
38 handlers: dict[EventType, list[EventHandler]] = field(default_factory=dict)
39 event_history: list[Event] = field(default_factory=list)
40 max_history: int = 1000
41
42 def subscribe(self, event_type: EventType, handler: EventHandler):
43 """Subscribe to an event type."""
44 if event_type not in self.handlers:
45 self.handlers[event_type] = []
46 self.handlers[event_type].append(handler)
47
48 def unsubscribe(self, event_type: EventType, handler: EventHandler):
49 """Unsubscribe from an event type."""
50 if event_type in self.handlers:
51 self.handlers[event_type].remove(handler)
52
53 async def emit(self, event: Event):
54 """Emit an event to all subscribers."""
55 # Store in history
56 self.event_history.append(event)
57 if len(self.event_history) > self.max_history:
58 self.event_history.pop(0)
59
60 # Notify handlers
61 if event.type in self.handlers:
62 await asyncio.gather(*[
63 handler(event)
64 for handler in self.handlers[event.type]
65 ], return_exceptions=True)
66
67 def get_history(
68 self,
69 event_type: Optional[EventType] = None,
70 source: Optional[str] = None,
71 limit: int = 100
72 ) -> list[Event]:
73 """Get event history with optional filtering."""
74 filtered = self.event_history
75
76 if event_type:
77 filtered = [e for e in filtered if e.type == event_type]
78 if source:
79 filtered = [e for e in filtered if e.source == source]
80
81 return filtered[-limit:]
82
83
84@dataclass
85class ReactiveCoordinator:
86 """Coordinates agents through events and reactions."""
87
88 event_bus: EventBus = field(default_factory=EventBus)
89 state: SharedStateManager = None
90 reactions: dict[EventType, list[Callable[[Event], Awaitable[None]]]] = field(
91 default_factory=dict
92 )
93
94 def __post_init__(self):
95 if self.state is None:
96 self.state = SharedStateManager()
97 # Set up default reactions
98 self._setup_default_reactions()
99
100 def _setup_default_reactions(self):
101 """Set up standard event reactions."""
102
103 async def on_task_completed(event: Event):
104 task_id = event.payload.get("task_id")
105 if task_id:
106 await self.state.set(
107 f"task:[task_id]:completed",
108 True,
109 "coordinator"
110 )
111
112 async def on_agent_joined(event: Event):
113 agent_id = event.payload.get("agent_id")
114 agents = await self.state.get("active_agents") or []
115 if agent_id not in agents:
116 agents.append(agent_id)
117 await self.state.set("active_agents", agents, "coordinator")
118
119 async def on_agent_left(event: Event):
120 agent_id = event.payload.get("agent_id")
121 agents = await self.state.get("active_agents") or []
122 if agent_id in agents:
123 agents.remove(agent_id)
124 await self.state.set("active_agents", agents, "coordinator")
125
126 self.event_bus.subscribe(EventType.TASK_COMPLETED, on_task_completed)
127 self.event_bus.subscribe(EventType.AGENT_JOINED, on_agent_joined)
128 self.event_bus.subscribe(EventType.AGENT_LEFT, on_agent_left)
129
130 def add_reaction(
131 self,
132 event_type: EventType,
133 reaction: Callable[[Event], Awaitable[None]]
134 ):
135 """Add custom reaction to event type."""
136 self.event_bus.subscribe(event_type, reaction)
137
138 async def emit(self, source: str, event_type: EventType, payload: dict):
139 """Emit event from an agent."""
140 event = Event(
141 id=str(uuid.uuid4()),
142 type=event_type,
143 source=source,
144 payload=payload
145 )
146 await self.event_bus.emit(event)Complete Multi-Agent System
Here's a complete example combining all coordination mechanisms:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3import asyncio
4import uuid
5
6
7@dataclass
8class AgentContext:
9 """Context available to each agent."""
10 agent_id: str
11 state: NamespacedState
12 communication: UnifiedCommunicationHub
13 sync: SynchronizationManager
14 events: ReactiveCoordinator
15
16
17@dataclass
18class CoordinatedAgent:
19 """Agent with full coordination capabilities."""
20
21 agent_id: str
22 context: AgentContext
23 capabilities: list[str] = field(default_factory=list)
24
25 async def start(self):
26 """Start the agent."""
27 # Announce joining
28 await self.context.events.emit(
29 self.agent_id,
30 EventType.AGENT_JOINED,
31 {"agent_id": self.agent_id, "capabilities": self.capabilities}
32 )
33
34 # Register for messages
35 self.context.communication.request_reply.register_handler(
36 self.agent_id,
37 self.handle_request
38 )
39
40 async def stop(self):
41 """Stop the agent."""
42 await self.context.events.emit(
43 self.agent_id,
44 EventType.AGENT_LEFT,
45 {"agent_id": self.agent_id}
46 )
47
48 async def handle_request(self, message: Message) -> Any:
49 """Handle incoming request."""
50 action = message.payload.get("action")
51
52 if action == "execute_task":
53 return await self.execute_task(message.payload.get("task"))
54 elif action == "get_status":
55 return await self.get_status()
56
57 return {"error": "Unknown action"}
58
59 async def execute_task(self, task: dict) -> dict:
60 """Execute a task with coordination."""
61 task_id = task.get("id", str(uuid.uuid4()))
62
63 # Emit task started
64 await self.context.events.emit(
65 self.agent_id,
66 EventType.TASK_STARTED,
67 {"task_id": task_id}
68 )
69
70 # Update shared state
71 await self.context.state.set(
72 "shared",
73 f"task:[task_id]:status",
74 "in_progress",
75 self.agent_id
76 )
77
78 # Acquire lock if needed
79 lock = self.context.sync.get_lock(f"task:[task_id]")
80 await lock.acquire(self.agent_id)
81
82 try:
83 # Execute task
84 result = await self._do_work(task)
85
86 # Update state
87 await self.context.state.set(
88 "shared",
89 f"task:[task_id]:result",
90 result,
91 self.agent_id
92 )
93
94 # Emit completion
95 await self.context.events.emit(
96 self.agent_id,
97 EventType.TASK_COMPLETED,
98 {"task_id": task_id, "result": result}
99 )
100
101 return result
102
103 except Exception as e:
104 await self.context.events.emit(
105 self.agent_id,
106 EventType.TASK_FAILED,
107 {"task_id": task_id, "error": str(e)}
108 )
109 raise
110
111 finally:
112 await lock.release(self.agent_id)
113
114 async def _do_work(self, task: dict) -> dict:
115 """Actual task execution."""
116 await asyncio.sleep(0.1) # Simulate work
117 return {"status": "completed", "output": "Result"}
118
119 async def get_status(self) -> dict:
120 """Get agent status."""
121 return {
122 "agent_id": self.agent_id,
123 "capabilities": self.capabilities,
124 "active": True
125 }
126
127 async def request_peer(
128 self,
129 peer_id: str,
130 action: str,
131 payload: dict = None
132 ) -> Any:
133 """Send request to peer agent."""
134 return await self.context.communication.request_reply.request(
135 sender=self.agent_id,
136 recipient=peer_id,
137 payload={"action": action, **(payload or {})}
138 )
139
140
141@dataclass
142class MultiAgentSystem:
143 """Complete multi-agent system with coordination."""
144
145 state: NamespacedState = None
146 communication: UnifiedCommunicationHub = None
147 sync: SynchronizationManager = None
148 events: ReactiveCoordinator = None
149 agents: dict[str, CoordinatedAgent] = field(default_factory=dict)
150
151 def __post_init__(self):
152 # Initialize infrastructure
153 store = InMemoryStateStore()
154 self.state = NamespacedState(store=store)
155 self.state.create_namespace("shared")
156 self.state.create_namespace("agents")
157
158 self.communication = UnifiedCommunicationHub()
159 self.sync = SynchronizationManager()
160 self.events = ReactiveCoordinator(state=SharedStateManager())
161
162 def create_context(self, agent_id: str) -> AgentContext:
163 """Create context for an agent."""
164 return AgentContext(
165 agent_id=agent_id,
166 state=self.state,
167 communication=self.communication,
168 sync=self.sync,
169 events=self.events
170 )
171
172 async def add_agent(
173 self,
174 agent_id: str,
175 capabilities: list[str]
176 ) -> CoordinatedAgent:
177 """Add and start an agent."""
178 context = self.create_context(agent_id)
179 agent = CoordinatedAgent(
180 agent_id=agent_id,
181 context=context,
182 capabilities=capabilities
183 )
184
185 await agent.start()
186 self.agents[agent_id] = agent
187
188 return agent
189
190 async def remove_agent(self, agent_id: str):
191 """Remove an agent."""
192 if agent_id in self.agents:
193 await self.agents[agent_id].stop()
194 del self.agents[agent_id]
195
196 async def submit_task(self, task: dict) -> dict:
197 """Submit task to appropriate agent."""
198 task_type = task.get("type", "general")
199
200 # Find capable agent
201 for agent in self.agents.values():
202 if task_type in agent.capabilities:
203 return await agent.execute_task(task)
204
205 return {"error": "No capable agent found"}
206
207
208# Example usage
209async def main():
210 # Create system
211 system = MultiAgentSystem()
212
213 # Add agents
214 await system.add_agent("worker_1", ["compute", "data_processing"])
215 await system.add_agent("worker_2", ["analysis", "reporting"])
216 await system.add_agent("coordinator", ["coordination", "compute"])
217
218 # Submit tasks
219 result = await system.submit_task({
220 "id": "task_1",
221 "type": "compute",
222 "payload": {"data": [1, 2, 3]}
223 })
224
225 print(f"Task result: [result]")
226
227 # Agent-to-agent communication
228 worker = system.agents["worker_1"]
229 peer_status = await worker.request_peer("worker_2", "get_status")
230 print(f"Peer status: [peer_status]")
231
232 # Cleanup
233 await system.remove_agent("worker_1")
234 await system.remove_agent("worker_2")
235 await system.remove_agent("coordinator")
236
237
238if __name__ == "__main__":
239 asyncio.run(main())Key Takeaways
- Shared state management with versioning and optimistic locking ensures consistency when multiple agents update data.
- Namespacing isolates state between agent groups while allowing shared access where needed.
- Communication patterns like request-reply, pub-sub, and streaming suit different coordination needs.
- Synchronization primitives (locks, barriers, semaphores) coordinate timing and resource access.
- Event-driven coordination enables loose coupling and reactive behavior across agents.
- Unified infrastructure combining all mechanisms provides a complete foundation for multi-agent systems.
Chapter Summary: You now have the building blocks for multi-agent systems - from architectural patterns (supervisor, peer-to-peer, hierarchical) to coordination mechanisms (state, communication, synchronization, events). In the next chapter, we'll explore LangGraph for graph-based multi-agent orchestration.