Chapter 14
20 min read
Section 90 of 175

Shared State and Communication

Multi-Agent Systems

Introduction

Regardless of whether you use supervisor, peer-to-peer, or hierarchical patterns, multi-agent systems require shared state and reliable communication. This section explores state management strategies, communication patterns, and synchronization mechanisms that enable effective agent coordination.

Section Overview: We'll build shared state systems with consistency guarantees, implement various communication patterns, and create event-driven coordination mechanisms.

Shared State Management

Agents need consistent access to shared state while avoiding conflicts and maintaining data integrity:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3from abc import ABC, abstractmethod
4import asyncio
5from datetime import datetime
6import json
7import hashlib
8
9
10@dataclass
11class StateVersion:
12    """Versioned state entry."""
13    value: Any
14    version: int
15    timestamp: datetime
16    updated_by: str
17    checksum: str
18
19    @classmethod
20    def create(cls, value: Any, version: int, updated_by: str) -> 'StateVersion':
21        checksum = hashlib.md5(
22            json.dumps(value, sort_keys=True, default=str).encode()
23        ).hexdigest()
24        return cls(
25            value=value,
26            version=version,
27            timestamp=datetime.now(),
28            updated_by=updated_by,
29            checksum=checksum
30        )
31
32
33class StateStore(ABC):
34    """Abstract state store interface."""
35
36    @abstractmethod
37    async def get(self, key: str) -> Optional[StateVersion]:
38        pass
39
40    @abstractmethod
41    async def set(
42        self,
43        key: str,
44        value: Any,
45        updater: str,
46        expected_version: Optional[int] = None
47    ) -> bool:
48        pass
49
50    @abstractmethod
51    async def delete(self, key: str) -> bool:
52        pass
53
54
55@dataclass
56class InMemoryStateStore(StateStore):
57    """In-memory state store with versioning."""
58
59    data: dict[str, StateVersion] = field(default_factory=dict)
60    locks: dict[str, asyncio.Lock] = field(default_factory=dict)
61    history: list[tuple[str, str, StateVersion]] = field(default_factory=list)
62
63    def _get_lock(self, key: str) -> asyncio.Lock:
64        if key not in self.locks:
65            self.locks[key] = asyncio.Lock()
66        return self.locks[key]
67
68    async def get(self, key: str) -> Optional[StateVersion]:
69        return self.data.get(key)
70
71    async def set(
72        self,
73        key: str,
74        value: Any,
75        updater: str,
76        expected_version: Optional[int] = None
77    ) -> bool:
78        async with self._get_lock(key):
79            current = self.data.get(key)
80
81            # Optimistic locking check
82            if expected_version is not None:
83                if current and current.version != expected_version:
84                    return False  # Version conflict
85
86            new_version = (current.version + 1) if current else 1
87            entry = StateVersion.create(value, new_version, updater)
88
89            self.data[key] = entry
90            self.history.append((key, "set", entry))
91
92            return True
93
94    async def delete(self, key: str) -> bool:
95        async with self._get_lock(key):
96            if key in self.data:
97                del self.data[key]
98                return True
99            return False
100
101    async def compare_and_swap(
102        self,
103        key: str,
104        expected_value: Any,
105        new_value: Any,
106        updater: str
107    ) -> bool:
108        """Atomic compare-and-swap operation."""
109        async with self._get_lock(key):
110            current = self.data.get(key)
111            current_value = current.value if current else None
112
113            if current_value != expected_value:
114                return False
115
116            return await self.set(key, new_value, updater)
117
118
119@dataclass
120class DistributedState:
121    """Distributed state with conflict resolution."""
122
123    stores: dict[str, StateStore] = field(default_factory=dict)
124    conflict_resolver: Callable[[list[StateVersion]], StateVersion] = None
125
126    def __post_init__(self):
127        if self.conflict_resolver is None:
128            self.conflict_resolver = self._last_write_wins
129
130    def _last_write_wins(self, versions: list[StateVersion]) -> StateVersion:
131        """Resolve conflicts using last-write-wins."""
132        return max(versions, key=lambda v: v.timestamp)
133
134    async def get(self, key: str) -> Optional[Any]:
135        """Get value from any available store."""
136        for store in self.stores.values():
137            result = await store.get(key)
138            if result:
139                return result.value
140        return None
141
142    async def set_replicated(
143        self,
144        key: str,
145        value: Any,
146        updater: str,
147        quorum: int = 1
148    ) -> bool:
149        """Set value with replication to multiple stores."""
150        successes = 0
151
152        for store in self.stores.values():
153            if await store.set(key, value, updater):
154                successes += 1
155
156        return successes >= quorum
157
158    async def sync_stores(self, key: str):
159        """Synchronize a key across all stores."""
160        versions = []
161
162        for store in self.stores.values():
163            version = await store.get(key)
164            if version:
165                versions.append(version)
166
167        if not versions:
168            return
169
170        # Resolve conflicts
171        winner = self.conflict_resolver(versions)
172
173        # Update all stores to winning version
174        for store in self.stores.values():
175            await store.set(
176                key,
177                winner.value,
178                "sync",
179                expected_version=None  # Force update
180            )

State Namespacing

🐍python
1@dataclass
2class NamespacedState:
3    """State store with namespace isolation."""
4
5    store: StateStore
6    namespaces: set[str] = field(default_factory=set)
7
8    def _namespaced_key(self, namespace: str, key: str) -> str:
9        return f"[namespace]:[key]"
10
11    def create_namespace(self, namespace: str):
12        """Create a new namespace."""
13        self.namespaces.add(namespace)
14
15    async def get(self, namespace: str, key: str) -> Optional[Any]:
16        """Get value from namespace."""
17        if namespace not in self.namespaces:
18            raise ValueError(f"Unknown namespace: [namespace]")
19
20        result = await self.store.get(self._namespaced_key(namespace, key))
21        return result.value if result else None
22
23    async def set(
24        self,
25        namespace: str,
26        key: str,
27        value: Any,
28        updater: str
29    ) -> bool:
30        """Set value in namespace."""
31        if namespace not in self.namespaces:
32            raise ValueError(f"Unknown namespace: [namespace]")
33
34        return await self.store.set(
35            self._namespaced_key(namespace, key),
36            value,
37            updater
38        )
39
40    async def list_keys(self, namespace: str) -> list[str]:
41        """List all keys in namespace."""
42        prefix = f"[namespace]:"
43        keys = []
44
45        if isinstance(self.store, InMemoryStateStore):
46            for key in self.store.data:
47                if key.startswith(prefix):
48                    keys.append(key[len(prefix):])
49
50        return keys
51
52
53# Usage example
54async def state_example():
55    store = InMemoryStateStore()
56    namespaced = NamespacedState(store=store)
57
58    # Create namespaces for different agent groups
59    namespaced.create_namespace("workers")
60    namespaced.create_namespace("supervisors")
61    namespaced.create_namespace("shared")
62
63    # Workers store their progress
64    await namespaced.set("workers", "task_1_progress", 0.5, "worker_1")
65
66    # Supervisors store coordination data
67    await namespaced.set("supervisors", "active_workers", ["w1", "w2"], "supervisor")
68
69    # Shared state for all agents
70    await namespaced.set("shared", "global_config", {"timeout": 30}, "admin")

Communication Patterns

Different communication patterns suit different coordination needs:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Callable, Awaitable
3from enum import Enum
4import asyncio
5
6
7class MessagePattern(Enum):
8    """Communication patterns."""
9    REQUEST_REPLY = "request_reply"
10    PUBLISH_SUBSCRIBE = "publish_subscribe"
11    FIRE_AND_FORGET = "fire_and_forget"
12    STREAMING = "streaming"
13
14
15@dataclass
16class Message:
17    """Generic message structure."""
18    id: str
19    sender: str
20    pattern: MessagePattern
21    topic: Optional[str] = None
22    payload: dict = field(default_factory=dict)
23    correlation_id: Optional[str] = None
24
25
26@dataclass
27class RequestReplyChannel:
28    """Channel for request-reply communication."""
29
30    pending_requests: dict[str, asyncio.Future] = field(default_factory=dict)
31    handlers: dict[str, Callable[[Message], Awaitable[Any]]] = field(
32        default_factory=dict
33    )
34
35    def register_handler(
36        self,
37        agent_id: str,
38        handler: Callable[[Message], Awaitable[Any]]
39    ):
40        """Register a handler for requests to this agent."""
41        self.handlers[agent_id] = handler
42
43    async def request(
44        self,
45        sender: str,
46        recipient: str,
47        payload: dict,
48        timeout: float = 30.0
49    ) -> Any:
50        """Send request and wait for reply."""
51        request_id = str(uuid.uuid4())
52        message = Message(
53            id=request_id,
54            sender=sender,
55            pattern=MessagePattern.REQUEST_REPLY,
56            payload=payload
57        )
58
59        # Create future for response
60        future = asyncio.Future()
61        self.pending_requests[request_id] = future
62
63        try:
64            # Dispatch to handler
65            if recipient in self.handlers:
66                asyncio.create_task(self._handle_request(recipient, message))
67
68            # Wait for response
69            return await asyncio.wait_for(future, timeout=timeout)
70
71        finally:
72            del self.pending_requests[request_id]
73
74    async def _handle_request(self, recipient: str, message: Message):
75        """Handle request and send response."""
76        handler = self.handlers[recipient]
77        try:
78            response = await handler(message)
79            if message.id in self.pending_requests:
80                self.pending_requests[message.id].set_result(response)
81        except Exception as e:
82            if message.id in self.pending_requests:
83                self.pending_requests[message.id].set_exception(e)
84
85
86@dataclass
87class PubSubChannel:
88    """Publish-subscribe channel."""
89
90    subscribers: dict[str, list[Callable[[Message], Awaitable[None]]]] = field(
91        default_factory=dict
92    )
93
94    def subscribe(
95        self,
96        topic: str,
97        handler: Callable[[Message], Awaitable[None]]
98    ):
99        """Subscribe to a topic."""
100        if topic not in self.subscribers:
101            self.subscribers[topic] = []
102        self.subscribers[topic].append(handler)
103
104    def unsubscribe(
105        self,
106        topic: str,
107        handler: Callable[[Message], Awaitable[None]]
108    ):
109        """Unsubscribe from a topic."""
110        if topic in self.subscribers:
111            self.subscribers[topic].remove(handler)
112
113    async def publish(self, sender: str, topic: str, payload: dict):
114        """Publish message to topic."""
115        message = Message(
116            id=str(uuid.uuid4()),
117            sender=sender,
118            pattern=MessagePattern.PUBLISH_SUBSCRIBE,
119            topic=topic,
120            payload=payload
121        )
122
123        if topic in self.subscribers:
124            await asyncio.gather(*[
125                handler(message)
126                for handler in self.subscribers[topic]
127            ])
128
129
130@dataclass
131class StreamingChannel:
132    """Channel for streaming data."""
133
134    streams: dict[str, asyncio.Queue] = field(default_factory=dict)
135
136    def create_stream(self, stream_id: str) -> asyncio.Queue:
137        """Create a new stream."""
138        self.streams[stream_id] = asyncio.Queue()
139        return self.streams[stream_id]
140
141    async def send(self, stream_id: str, data: Any):
142        """Send data to stream."""
143        if stream_id in self.streams:
144            await self.streams[stream_id].put(data)
145
146    async def receive(
147        self,
148        stream_id: str,
149        timeout: float = None
150    ) -> Optional[Any]:
151        """Receive data from stream."""
152        if stream_id not in self.streams:
153            return None
154
155        try:
156            if timeout:
157                return await asyncio.wait_for(
158                    self.streams[stream_id].get(),
159                    timeout=timeout
160                )
161            return await self.streams[stream_id].get()
162        except asyncio.TimeoutError:
163            return None
164
165    def close_stream(self, stream_id: str):
166        """Close a stream."""
167        if stream_id in self.streams:
168            del self.streams[stream_id]
169
170
171@dataclass
172class UnifiedCommunicationHub:
173    """Unified hub for all communication patterns."""
174
175    request_reply: RequestReplyChannel = field(default_factory=RequestReplyChannel)
176    pub_sub: PubSubChannel = field(default_factory=PubSubChannel)
177    streaming: StreamingChannel = field(default_factory=StreamingChannel)
PatternUse CaseCharacteristics
Request-ReplyTask delegation, queriesSynchronous, bidirectional
Pub-SubEvents, notificationsAsync, decoupled, many-to-many
Fire-and-ForgetLogging, metricsAsync, no confirmation
StreamingProgress updates, data feedsContinuous, ordered

Synchronization Mechanisms

When agents need to coordinate timing or access shared resources, synchronization primitives ensure consistency:

🐍python
1from dataclasses import dataclass, field
2from typing import Set
3import asyncio
4
5
6@dataclass
7class DistributedLock:
8    """Distributed lock for exclusive access."""
9
10    lock_id: str
11    holder: Optional[str] = None
12    waiters: list[asyncio.Future] = field(default_factory=list)
13    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
14
15    async def acquire(self, agent_id: str, timeout: float = 30.0) -> bool:
16        """Acquire the lock."""
17        async with self._lock:
18            if self.holder is None:
19                self.holder = agent_id
20                return True
21
22            # Add to waiters
23            future = asyncio.Future()
24            self.waiters.append((agent_id, future))
25
26        # Wait for lock
27        try:
28            await asyncio.wait_for(future, timeout=timeout)
29            return True
30        except asyncio.TimeoutError:
31            # Remove from waiters
32            self.waiters = [(a, f) for a, f in self.waiters if a != agent_id]
33            return False
34
35    async def release(self, agent_id: str):
36        """Release the lock."""
37        async with self._lock:
38            if self.holder != agent_id:
39                return  # Not the holder
40
41            self.holder = None
42
43            # Wake next waiter
44            if self.waiters:
45                next_agent, future = self.waiters.pop(0)
46                self.holder = next_agent
47                future.set_result(True)
48
49
50@dataclass
51class Barrier:
52    """Synchronization barrier for multiple agents."""
53
54    required_count: int
55    arrived: Set[str] = field(default_factory=set)
56    event: asyncio.Event = field(default_factory=asyncio.Event)
57    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
58
59    async def wait(self, agent_id: str, timeout: float = 60.0) -> bool:
60        """Wait at barrier until all agents arrive."""
61        async with self._lock:
62            self.arrived.add(agent_id)
63
64            if len(self.arrived) >= self.required_count:
65                self.event.set()
66                return True
67
68        # Wait for all to arrive
69        try:
70            await asyncio.wait_for(self.event.wait(), timeout=timeout)
71            return True
72        except asyncio.TimeoutError:
73            return False
74
75    def reset(self):
76        """Reset the barrier."""
77        self.arrived.clear()
78        self.event.clear()
79
80
81@dataclass
82class Semaphore:
83    """Distributed semaphore for limited concurrent access."""
84
85    max_permits: int
86    current_permits: int = 0
87    holders: Set[str] = field(default_factory=set)
88    waiters: list[tuple[str, asyncio.Future]] = field(default_factory=list)
89    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
90
91    def __post_init__(self):
92        self.current_permits = self.max_permits
93
94    async def acquire(self, agent_id: str, timeout: float = 30.0) -> bool:
95        """Acquire a permit."""
96        async with self._lock:
97            if self.current_permits > 0:
98                self.current_permits -= 1
99                self.holders.add(agent_id)
100                return True
101
102            future = asyncio.Future()
103            self.waiters.append((agent_id, future))
104
105        try:
106            await asyncio.wait_for(future, timeout=timeout)
107            return True
108        except asyncio.TimeoutError:
109            self.waiters = [(a, f) for a, f in self.waiters if a != agent_id]
110            return False
111
112    async def release(self, agent_id: str):
113        """Release a permit."""
114        async with self._lock:
115            if agent_id not in self.holders:
116                return
117
118            self.holders.remove(agent_id)
119
120            if self.waiters:
121                next_agent, future = self.waiters.pop(0)
122                self.holders.add(next_agent)
123                future.set_result(True)
124            else:
125                self.current_permits += 1
126
127
128@dataclass
129class SynchronizationManager:
130    """Manages all synchronization primitives."""
131
132    locks: dict[str, DistributedLock] = field(default_factory=dict)
133    barriers: dict[str, Barrier] = field(default_factory=dict)
134    semaphores: dict[str, Semaphore] = field(default_factory=dict)
135
136    def get_lock(self, lock_id: str) -> DistributedLock:
137        """Get or create a lock."""
138        if lock_id not in self.locks:
139            self.locks[lock_id] = DistributedLock(lock_id=lock_id)
140        return self.locks[lock_id]
141
142    def get_barrier(self, barrier_id: str, count: int) -> Barrier:
143        """Get or create a barrier."""
144        if barrier_id not in self.barriers:
145            self.barriers[barrier_id] = Barrier(required_count=count)
146        return self.barriers[barrier_id]
147
148    def get_semaphore(self, sem_id: str, permits: int) -> Semaphore:
149        """Get or create a semaphore."""
150        if sem_id not in self.semaphores:
151            self.semaphores[sem_id] = Semaphore(max_permits=permits)
152        return self.semaphores[sem_id]

Event-Driven Coordination

Event-driven architectures enable loose coupling and reactive coordination:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Callable, Awaitable
3from enum import Enum
4import asyncio
5from datetime import datetime
6
7
8class EventType(Enum):
9    """Standard event types."""
10    TASK_CREATED = "task_created"
11    TASK_STARTED = "task_started"
12    TASK_COMPLETED = "task_completed"
13    TASK_FAILED = "task_failed"
14    AGENT_JOINED = "agent_joined"
15    AGENT_LEFT = "agent_left"
16    STATE_CHANGED = "state_changed"
17    ERROR_OCCURRED = "error_occurred"
18
19
20@dataclass
21class Event:
22    """Domain event."""
23    id: str
24    type: EventType
25    source: str
26    timestamp: datetime = field(default_factory=datetime.now)
27    payload: dict = field(default_factory=dict)
28    metadata: dict = field(default_factory=dict)
29
30
31EventHandler = Callable[[Event], Awaitable[None]]
32
33
34@dataclass
35class EventBus:
36    """Central event bus for the multi-agent system."""
37
38    handlers: dict[EventType, list[EventHandler]] = field(default_factory=dict)
39    event_history: list[Event] = field(default_factory=list)
40    max_history: int = 1000
41
42    def subscribe(self, event_type: EventType, handler: EventHandler):
43        """Subscribe to an event type."""
44        if event_type not in self.handlers:
45            self.handlers[event_type] = []
46        self.handlers[event_type].append(handler)
47
48    def unsubscribe(self, event_type: EventType, handler: EventHandler):
49        """Unsubscribe from an event type."""
50        if event_type in self.handlers:
51            self.handlers[event_type].remove(handler)
52
53    async def emit(self, event: Event):
54        """Emit an event to all subscribers."""
55        # Store in history
56        self.event_history.append(event)
57        if len(self.event_history) > self.max_history:
58            self.event_history.pop(0)
59
60        # Notify handlers
61        if event.type in self.handlers:
62            await asyncio.gather(*[
63                handler(event)
64                for handler in self.handlers[event.type]
65            ], return_exceptions=True)
66
67    def get_history(
68        self,
69        event_type: Optional[EventType] = None,
70        source: Optional[str] = None,
71        limit: int = 100
72    ) -> list[Event]:
73        """Get event history with optional filtering."""
74        filtered = self.event_history
75
76        if event_type:
77            filtered = [e for e in filtered if e.type == event_type]
78        if source:
79            filtered = [e for e in filtered if e.source == source]
80
81        return filtered[-limit:]
82
83
84@dataclass
85class ReactiveCoordinator:
86    """Coordinates agents through events and reactions."""
87
88    event_bus: EventBus = field(default_factory=EventBus)
89    state: SharedStateManager = None
90    reactions: dict[EventType, list[Callable[[Event], Awaitable[None]]]] = field(
91        default_factory=dict
92    )
93
94    def __post_init__(self):
95        if self.state is None:
96            self.state = SharedStateManager()
97        # Set up default reactions
98        self._setup_default_reactions()
99
100    def _setup_default_reactions(self):
101        """Set up standard event reactions."""
102
103        async def on_task_completed(event: Event):
104            task_id = event.payload.get("task_id")
105            if task_id:
106                await self.state.set(
107                    f"task:[task_id]:completed",
108                    True,
109                    "coordinator"
110                )
111
112        async def on_agent_joined(event: Event):
113            agent_id = event.payload.get("agent_id")
114            agents = await self.state.get("active_agents") or []
115            if agent_id not in agents:
116                agents.append(agent_id)
117                await self.state.set("active_agents", agents, "coordinator")
118
119        async def on_agent_left(event: Event):
120            agent_id = event.payload.get("agent_id")
121            agents = await self.state.get("active_agents") or []
122            if agent_id in agents:
123                agents.remove(agent_id)
124                await self.state.set("active_agents", agents, "coordinator")
125
126        self.event_bus.subscribe(EventType.TASK_COMPLETED, on_task_completed)
127        self.event_bus.subscribe(EventType.AGENT_JOINED, on_agent_joined)
128        self.event_bus.subscribe(EventType.AGENT_LEFT, on_agent_left)
129
130    def add_reaction(
131        self,
132        event_type: EventType,
133        reaction: Callable[[Event], Awaitable[None]]
134    ):
135        """Add custom reaction to event type."""
136        self.event_bus.subscribe(event_type, reaction)
137
138    async def emit(self, source: str, event_type: EventType, payload: dict):
139        """Emit event from an agent."""
140        event = Event(
141            id=str(uuid.uuid4()),
142            type=event_type,
143            source=source,
144            payload=payload
145        )
146        await self.event_bus.emit(event)

Complete Multi-Agent System

Here's a complete example combining all coordination mechanisms:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3import asyncio
4import uuid
5
6
7@dataclass
8class AgentContext:
9    """Context available to each agent."""
10    agent_id: str
11    state: NamespacedState
12    communication: UnifiedCommunicationHub
13    sync: SynchronizationManager
14    events: ReactiveCoordinator
15
16
17@dataclass
18class CoordinatedAgent:
19    """Agent with full coordination capabilities."""
20
21    agent_id: str
22    context: AgentContext
23    capabilities: list[str] = field(default_factory=list)
24
25    async def start(self):
26        """Start the agent."""
27        # Announce joining
28        await self.context.events.emit(
29            self.agent_id,
30            EventType.AGENT_JOINED,
31            {"agent_id": self.agent_id, "capabilities": self.capabilities}
32        )
33
34        # Register for messages
35        self.context.communication.request_reply.register_handler(
36            self.agent_id,
37            self.handle_request
38        )
39
40    async def stop(self):
41        """Stop the agent."""
42        await self.context.events.emit(
43            self.agent_id,
44            EventType.AGENT_LEFT,
45            {"agent_id": self.agent_id}
46        )
47
48    async def handle_request(self, message: Message) -> Any:
49        """Handle incoming request."""
50        action = message.payload.get("action")
51
52        if action == "execute_task":
53            return await self.execute_task(message.payload.get("task"))
54        elif action == "get_status":
55            return await self.get_status()
56
57        return {"error": "Unknown action"}
58
59    async def execute_task(self, task: dict) -> dict:
60        """Execute a task with coordination."""
61        task_id = task.get("id", str(uuid.uuid4()))
62
63        # Emit task started
64        await self.context.events.emit(
65            self.agent_id,
66            EventType.TASK_STARTED,
67            {"task_id": task_id}
68        )
69
70        # Update shared state
71        await self.context.state.set(
72            "shared",
73            f"task:[task_id]:status",
74            "in_progress",
75            self.agent_id
76        )
77
78        # Acquire lock if needed
79        lock = self.context.sync.get_lock(f"task:[task_id]")
80        await lock.acquire(self.agent_id)
81
82        try:
83            # Execute task
84            result = await self._do_work(task)
85
86            # Update state
87            await self.context.state.set(
88                "shared",
89                f"task:[task_id]:result",
90                result,
91                self.agent_id
92            )
93
94            # Emit completion
95            await self.context.events.emit(
96                self.agent_id,
97                EventType.TASK_COMPLETED,
98                {"task_id": task_id, "result": result}
99            )
100
101            return result
102
103        except Exception as e:
104            await self.context.events.emit(
105                self.agent_id,
106                EventType.TASK_FAILED,
107                {"task_id": task_id, "error": str(e)}
108            )
109            raise
110
111        finally:
112            await lock.release(self.agent_id)
113
114    async def _do_work(self, task: dict) -> dict:
115        """Actual task execution."""
116        await asyncio.sleep(0.1)  # Simulate work
117        return {"status": "completed", "output": "Result"}
118
119    async def get_status(self) -> dict:
120        """Get agent status."""
121        return {
122            "agent_id": self.agent_id,
123            "capabilities": self.capabilities,
124            "active": True
125        }
126
127    async def request_peer(
128        self,
129        peer_id: str,
130        action: str,
131        payload: dict = None
132    ) -> Any:
133        """Send request to peer agent."""
134        return await self.context.communication.request_reply.request(
135            sender=self.agent_id,
136            recipient=peer_id,
137            payload={"action": action, **(payload or {})}
138        )
139
140
141@dataclass
142class MultiAgentSystem:
143    """Complete multi-agent system with coordination."""
144
145    state: NamespacedState = None
146    communication: UnifiedCommunicationHub = None
147    sync: SynchronizationManager = None
148    events: ReactiveCoordinator = None
149    agents: dict[str, CoordinatedAgent] = field(default_factory=dict)
150
151    def __post_init__(self):
152        # Initialize infrastructure
153        store = InMemoryStateStore()
154        self.state = NamespacedState(store=store)
155        self.state.create_namespace("shared")
156        self.state.create_namespace("agents")
157
158        self.communication = UnifiedCommunicationHub()
159        self.sync = SynchronizationManager()
160        self.events = ReactiveCoordinator(state=SharedStateManager())
161
162    def create_context(self, agent_id: str) -> AgentContext:
163        """Create context for an agent."""
164        return AgentContext(
165            agent_id=agent_id,
166            state=self.state,
167            communication=self.communication,
168            sync=self.sync,
169            events=self.events
170        )
171
172    async def add_agent(
173        self,
174        agent_id: str,
175        capabilities: list[str]
176    ) -> CoordinatedAgent:
177        """Add and start an agent."""
178        context = self.create_context(agent_id)
179        agent = CoordinatedAgent(
180            agent_id=agent_id,
181            context=context,
182            capabilities=capabilities
183        )
184
185        await agent.start()
186        self.agents[agent_id] = agent
187
188        return agent
189
190    async def remove_agent(self, agent_id: str):
191        """Remove an agent."""
192        if agent_id in self.agents:
193            await self.agents[agent_id].stop()
194            del self.agents[agent_id]
195
196    async def submit_task(self, task: dict) -> dict:
197        """Submit task to appropriate agent."""
198        task_type = task.get("type", "general")
199
200        # Find capable agent
201        for agent in self.agents.values():
202            if task_type in agent.capabilities:
203                return await agent.execute_task(task)
204
205        return {"error": "No capable agent found"}
206
207
208# Example usage
209async def main():
210    # Create system
211    system = MultiAgentSystem()
212
213    # Add agents
214    await system.add_agent("worker_1", ["compute", "data_processing"])
215    await system.add_agent("worker_2", ["analysis", "reporting"])
216    await system.add_agent("coordinator", ["coordination", "compute"])
217
218    # Submit tasks
219    result = await system.submit_task({
220        "id": "task_1",
221        "type": "compute",
222        "payload": {"data": [1, 2, 3]}
223    })
224
225    print(f"Task result: [result]")
226
227    # Agent-to-agent communication
228    worker = system.agents["worker_1"]
229    peer_status = await worker.request_peer("worker_2", "get_status")
230    print(f"Peer status: [peer_status]")
231
232    # Cleanup
233    await system.remove_agent("worker_1")
234    await system.remove_agent("worker_2")
235    await system.remove_agent("coordinator")
236
237
238if __name__ == "__main__":
239    asyncio.run(main())

Key Takeaways

  • Shared state management with versioning and optimistic locking ensures consistency when multiple agents update data.
  • Namespacing isolates state between agent groups while allowing shared access where needed.
  • Communication patterns like request-reply, pub-sub, and streaming suit different coordination needs.
  • Synchronization primitives (locks, barriers, semaphores) coordinate timing and resource access.
  • Event-driven coordination enables loose coupling and reactive behavior across agents.
  • Unified infrastructure combining all mechanisms provides a complete foundation for multi-agent systems.
Chapter Summary: You now have the building blocks for multi-agent systems - from architectural patterns (supervisor, peer-to-peer, hierarchical) to coordination mechanisms (state, communication, synchronization, events). In the next chapter, we'll explore LangGraph for graph-based multi-agent orchestration.