Chapter 14
15 min read
Section 88 of 175

Peer-to-Peer Collaboration

Multi-Agent Systems

Introduction

Peer-to-peer (P2P) multi-agent systems enable agents to collaborate as equals without centralized control. This pattern excels when tasks benefit from diverse perspectives, creative problem-solving, or fault tolerance. This section explores communication protocols, consensus mechanisms, and collaborative patterns for P2P agent systems.

Section Overview: We'll build peer communication infrastructure, implement consensus protocols, and explore patterns for agents to collaboratively solve problems.

Peer Communication

Effective peer-to-peer systems require robust communication infrastructure for direct agent-to-agent messaging:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable, Awaitable
3from enum import Enum
4import asyncio
5from datetime import datetime
6import uuid
7
8
9class MessageType(Enum):
10    """Types of peer messages."""
11    REQUEST = "request"
12    RESPONSE = "response"
13    BROADCAST = "broadcast"
14    PROPOSAL = "proposal"
15    VOTE = "vote"
16    ACK = "acknowledgement"
17
18
19@dataclass
20class PeerMessage:
21    """Message passed between peers."""
22    id: str
23    sender: str
24    recipient: Optional[str]  # None for broadcast
25    message_type: MessageType
26    content: dict
27    timestamp: datetime = field(default_factory=datetime.now)
28    reply_to: Optional[str] = None  # ID of message this replies to
29
30    @classmethod
31    def create(
32        cls,
33        sender: str,
34        recipient: Optional[str],
35        msg_type: MessageType,
36        content: dict,
37        reply_to: Optional[str] = None
38    ) -> 'PeerMessage':
39        return cls(
40            id=str(uuid.uuid4()),
41            sender=sender,
42            recipient=recipient,
43            message_type=msg_type,
44            content=content,
45            reply_to=reply_to
46        )
47
48
49@dataclass
50class MessageBus:
51    """Central message bus for peer communication."""
52
53    subscribers: dict[str, asyncio.Queue] = field(default_factory=dict)
54    message_handlers: dict[str, Callable[[PeerMessage], Awaitable[None]]] = field(
55        default_factory=dict
56    )
57    message_history: list[PeerMessage] = field(default_factory=list)
58
59    def subscribe(self, peer_id: str) -> asyncio.Queue:
60        """Subscribe peer to message bus."""
61        queue = asyncio.Queue()
62        self.subscribers[peer_id] = queue
63        return queue
64
65    def unsubscribe(self, peer_id: str):
66        """Unsubscribe peer from message bus."""
67        if peer_id in self.subscribers:
68            del self.subscribers[peer_id]
69
70    async def send(self, message: PeerMessage):
71        """Send message to specific recipient or broadcast."""
72        self.message_history.append(message)
73
74        if message.recipient:
75            # Direct message
76            if message.recipient in self.subscribers:
77                await self.subscribers[message.recipient].put(message)
78        else:
79            # Broadcast to all except sender
80            for peer_id, queue in self.subscribers.items():
81                if peer_id != message.sender:
82                    await queue.put(message)
83
84    async def broadcast(self, sender: str, content: dict):
85        """Convenience method for broadcast."""
86        message = PeerMessage.create(
87            sender=sender,
88            recipient=None,
89            msg_type=MessageType.BROADCAST,
90            content=content
91        )
92        await self.send(message)
93
94
95@dataclass
96class PeerCommunicator:
97    """Communication interface for a peer agent."""
98
99    peer_id: str
100    bus: MessageBus
101    inbox: asyncio.Queue = field(default_factory=asyncio.Queue)
102    pending_responses: dict[str, asyncio.Future] = field(default_factory=dict)
103
104    def __post_init__(self):
105        # Subscribe to bus
106        self.inbox = self.bus.subscribe(self.peer_id)
107
108    async def send_request(
109        self,
110        recipient: str,
111        content: dict,
112        timeout: float = 30.0
113    ) -> Optional[PeerMessage]:
114        """Send request and wait for response."""
115        message = PeerMessage.create(
116            sender=self.peer_id,
117            recipient=recipient,
118            msg_type=MessageType.REQUEST,
119            content=content
120        )
121
122        # Create future for response
123        future = asyncio.Future()
124        self.pending_responses[message.id] = future
125
126        # Send message
127        await self.bus.send(message)
128
129        try:
130            # Wait for response
131            response = await asyncio.wait_for(future, timeout=timeout)
132            return response
133        except asyncio.TimeoutError:
134            return None
135        finally:
136            del self.pending_responses[message.id]
137
138    async def send_response(
139        self,
140        request_id: str,
141        recipient: str,
142        content: dict
143    ):
144        """Send response to a request."""
145        message = PeerMessage.create(
146            sender=self.peer_id,
147            recipient=recipient,
148            msg_type=MessageType.RESPONSE,
149            content=content,
150            reply_to=request_id
151        )
152        await self.bus.send(message)
153
154    async def broadcast(self, content: dict):
155        """Broadcast to all peers."""
156        await self.bus.broadcast(self.peer_id, content)
157
158    async def receive(self, timeout: float = None) -> Optional[PeerMessage]:
159        """Receive next message."""
160        try:
161            if timeout:
162                message = await asyncio.wait_for(
163                    self.inbox.get(),
164                    timeout=timeout
165                )
166            else:
167                message = await self.inbox.get()
168
169            # Check if this is a response to a pending request
170            if (message.message_type == MessageType.RESPONSE and
171                message.reply_to in self.pending_responses):
172                self.pending_responses[message.reply_to].set_result(message)
173                return None  # Don't return responses, they're handled by futures
174
175            return message
176
177        except asyncio.TimeoutError:
178            return None

Consensus Mechanisms

Without central authority, peers must reach consensus through voting, negotiation, or other agreement protocols:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4import asyncio
5
6
7class VoteType(Enum):
8    """Types of votes."""
9    APPROVE = "approve"
10    REJECT = "reject"
11    ABSTAIN = "abstain"
12
13
14@dataclass
15class Proposal:
16    """A proposal for consensus."""
17    id: str
18    proposer: str
19    content: dict
20    votes: dict[str, VoteType] = field(default_factory=dict)
21    status: str = "pending"  # pending, accepted, rejected
22
23    def add_vote(self, voter: str, vote: VoteType):
24        self.votes[voter] = vote
25
26    def tally(self, total_voters: int) -> dict:
27        approvals = sum(1 for v in self.votes.values() if v == VoteType.APPROVE)
28        rejections = sum(1 for v in self.votes.values() if v == VoteType.REJECT)
29        abstentions = sum(1 for v in self.votes.values() if v == VoteType.ABSTAIN)
30
31        return {
32            "approvals": approvals,
33            "rejections": rejections,
34            "abstentions": abstentions,
35            "pending": total_voters - len(self.votes)
36        }
37
38
39@dataclass
40class ConsensusProtocol:
41    """Base consensus protocol."""
42
43    peers: list[str] = field(default_factory=list)
44    proposals: dict[str, Proposal] = field(default_factory=dict)
45    quorum: float = 0.5  # Minimum participation required
46    threshold: float = 0.5  # Approval threshold
47
48    def create_proposal(self, proposer: str, content: dict) -> str:
49        """Create a new proposal."""
50        proposal_id = str(uuid.uuid4())
51        proposal = Proposal(id=proposal_id, proposer=proposer, content=content)
52        self.proposals[proposal_id] = proposal
53        return proposal_id
54
55    def vote(self, proposal_id: str, voter: str, vote: VoteType):
56        """Cast a vote on a proposal."""
57        if proposal_id in self.proposals:
58            self.proposals[proposal_id].add_vote(voter, vote)
59            self._check_consensus(proposal_id)
60
61    def _check_consensus(self, proposal_id: str):
62        """Check if consensus has been reached."""
63        proposal = self.proposals[proposal_id]
64        tally = proposal.tally(len(self.peers))
65
66        # Check quorum
67        participated = len(proposal.votes)
68        if participated / len(self.peers) < self.quorum:
69            return  # Not enough participation yet
70
71        # Check threshold
72        approvals = tally["approvals"]
73        if approvals / participated >= self.threshold:
74            proposal.status = "accepted"
75        elif tally["rejections"] / participated > (1 - self.threshold):
76            proposal.status = "rejected"
77
78
79@dataclass
80class MajorityVoting(ConsensusProtocol):
81    """Simple majority voting consensus."""
82
83    threshold: float = 0.5
84
85    def is_accepted(self, proposal_id: str) -> bool:
86        return self.proposals[proposal_id].status == "accepted"
87
88
89@dataclass
90class SuperMajorityVoting(ConsensusProtocol):
91    """Requires 2/3 majority for acceptance."""
92
93    threshold: float = 0.67
94
95
96@dataclass
97class UnanimousConsensus(ConsensusProtocol):
98    """Requires all peers to approve."""
99
100    threshold: float = 1.0
101
102
103@dataclass
104class ConsensusCoordinator:
105    """Coordinates consensus process among peers."""
106
107    protocol: ConsensusProtocol
108    bus: MessageBus
109    peer_id: str
110
111    async def propose(self, content: dict) -> str:
112        """Propose something for consensus."""
113        proposal_id = self.protocol.create_proposal(self.peer_id, content)
114
115        # Broadcast proposal
116        await self.bus.broadcast(
117            self.peer_id,
118            {
119                "type": "proposal",
120                "proposal_id": proposal_id,
121                "content": content
122            }
123        )
124
125        return proposal_id
126
127    async def cast_vote(self, proposal_id: str, vote: VoteType):
128        """Cast vote on a proposal."""
129        self.protocol.vote(proposal_id, self.peer_id, vote)
130
131        # Broadcast vote
132        await self.bus.broadcast(
133            self.peer_id,
134            {
135                "type": "vote",
136                "proposal_id": proposal_id,
137                "vote": vote.value
138            }
139        )
140
141    async def wait_for_consensus(
142        self,
143        proposal_id: str,
144        timeout: float = 60.0
145    ) -> str:
146        """Wait for consensus on a proposal."""
147        start = asyncio.get_event_loop().time()
148
149        while asyncio.get_event_loop().time() - start < timeout:
150            proposal = self.protocol.proposals.get(proposal_id)
151            if proposal and proposal.status != "pending":
152                return proposal.status
153            await asyncio.sleep(0.5)
154
155        return "timeout"

Raft-Inspired Leader Election

🐍python
1from dataclasses import dataclass, field
2from typing import Optional
3import asyncio
4import random
5
6
7class NodeState(Enum):
8    """State of a node in leader election."""
9    FOLLOWER = "follower"
10    CANDIDATE = "candidate"
11    LEADER = "leader"
12
13
14@dataclass
15class LeaderElection:
16    """Raft-inspired leader election for peer agents."""
17
18    node_id: str
19    peers: list[str] = field(default_factory=list)
20    state: NodeState = NodeState.FOLLOWER
21    current_term: int = 0
22    voted_for: Optional[str] = None
23    current_leader: Optional[str] = None
24
25    # Timing
26    election_timeout_min: float = 1.5
27    election_timeout_max: float = 3.0
28    heartbeat_interval: float = 0.5
29
30    # Votes received
31    votes_received: set = field(default_factory=set)
32
33    def _random_timeout(self) -> float:
34        """Random election timeout to avoid split votes."""
35        return random.uniform(
36            self.election_timeout_min,
37            self.election_timeout_max
38        )
39
40    async def start_election(self):
41        """Start a new election as candidate."""
42        self.state = NodeState.CANDIDATE
43        self.current_term += 1
44        self.voted_for = self.node_id
45        self.votes_received = {self.node_id}
46
47        print(f"[self.node_id] starting election for term [self.current_term]")
48
49    def request_vote(self, candidate_id: str, term: int) -> bool:
50        """Handle vote request from a candidate."""
51        if term < self.current_term:
52            return False
53
54        if term > self.current_term:
55            self.current_term = term
56            self.state = NodeState.FOLLOWER
57            self.voted_for = None
58
59        if self.voted_for is None or self.voted_for == candidate_id:
60            self.voted_for = candidate_id
61            return True
62
63        return False
64
65    def receive_vote(self, voter: str, granted: bool):
66        """Receive vote response."""
67        if granted:
68            self.votes_received.add(voter)
69
70            # Check if we have majority
71            if len(self.votes_received) > (len(self.peers) + 1) / 2:
72                self.become_leader()
73
74    def become_leader(self):
75        """Transition to leader state."""
76        self.state = NodeState.LEADER
77        self.current_leader = self.node_id
78        print(f"[self.node_id] became leader for term [self.current_term]")
79
80    def receive_heartbeat(self, leader_id: str, term: int):
81        """Receive heartbeat from leader."""
82        if term >= self.current_term:
83            self.current_term = term
84            self.state = NodeState.FOLLOWER
85            self.current_leader = leader_id
86            self.voted_for = None
87
88
89@dataclass
90class DistributedLeaderElection:
91    """Full distributed leader election with message passing."""
92
93    node_id: str
94    bus: MessageBus
95    election: LeaderElection = field(default_factory=lambda: LeaderElection(node_id=""))
96    running: bool = False
97
98    def __post_init__(self):
99        self.election = LeaderElection(node_id=self.node_id)
100
101    async def run(self):
102        """Run the leader election loop."""
103        self.running = True
104
105        while self.running:
106            if self.election.state == NodeState.LEADER:
107                await self._leader_loop()
108            else:
109                await self._follower_loop()
110
111    async def _leader_loop(self):
112        """Send heartbeats as leader."""
113        await self.bus.broadcast(
114            self.node_id,
115            {
116                "type": "heartbeat",
117                "term": self.election.current_term,
118                "leader_id": self.node_id
119            }
120        )
121        await asyncio.sleep(self.election.heartbeat_interval)
122
123    async def _follower_loop(self):
124        """Wait for heartbeat or start election."""
125        timeout = self.election._random_timeout()
126
127        try:
128            # Wait for message
129            queue = self.bus.subscribers.get(self.node_id)
130            if queue:
131                message = await asyncio.wait_for(queue.get(), timeout=timeout)
132                await self._handle_message(message)
133        except asyncio.TimeoutError:
134            # No heartbeat received, start election
135            await self.election.start_election()
136            await self._request_votes()
137
138    async def _request_votes(self):
139        """Request votes from all peers."""
140        await self.bus.broadcast(
141            self.node_id,
142            {
143                "type": "vote_request",
144                "term": self.election.current_term,
145                "candidate_id": self.node_id
146            }
147        )
148
149    async def _handle_message(self, message: PeerMessage):
150        """Handle incoming election message."""
151        content = message.content
152        msg_type = content.get("type")
153
154        if msg_type == "heartbeat":
155            self.election.receive_heartbeat(
156                content["leader_id"],
157                content["term"]
158            )
159
160        elif msg_type == "vote_request":
161            granted = self.election.request_vote(
162                content["candidate_id"],
163                content["term"]
164            )
165            await self.bus.send(PeerMessage.create(
166                sender=self.node_id,
167                recipient=message.sender,
168                msg_type=MessageType.RESPONSE,
169                content={"type": "vote_response", "granted": granted}
170            ))
171
172        elif msg_type == "vote_response":
173            self.election.receive_vote(message.sender, content["granted"])

Collaborative Patterns

Peers can collaborate in various patterns depending on the nature of the task:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Callable, Awaitable
3import asyncio
4
5
6@dataclass
7class CollaborativeAgent:
8    """Agent that participates in peer collaboration."""
9
10    agent_id: str
11    communicator: PeerCommunicator
12    expertise: list[str] = field(default_factory=list)
13
14    async def contribute(self, topic: str, context: dict) -> dict:
15        """Provide contribution based on expertise."""
16        # Override in specific implementations
17        return {"contribution": "base contribution"}
18
19    async def review(self, content: dict) -> dict:
20        """Review another peer's work."""
21        return {"approved": True, "feedback": ""}
22
23    async def refine(self, content: dict, feedback: list[dict]) -> dict:
24        """Refine content based on feedback."""
25        return content
26
27
28@dataclass
29class RoundRobinCollaboration:
30    """Each peer contributes in sequence."""
31
32    agents: list[CollaborativeAgent] = field(default_factory=list)
33    current_index: int = 0
34
35    async def execute(self, task: dict) -> dict:
36        """Execute task through round-robin contributions."""
37        context = {"task": task, "contributions": []}
38
39        for agent in self.agents:
40            contribution = await agent.contribute(
41                task.get("topic", ""),
42                context
43            )
44            context["contributions"].append({
45                "agent": agent.agent_id,
46                "contribution": contribution
47            })
48
49        return context
50
51
52@dataclass
53class ParallelContribution:
54    """All peers contribute simultaneously."""
55
56    agents: list[CollaborativeAgent] = field(default_factory=list)
57
58    async def execute(self, task: dict) -> dict:
59        """All agents contribute in parallel."""
60        contributions = await asyncio.gather(*[
61            agent.contribute(task.get("topic", ""), {"task": task})
62            for agent in self.agents
63        ])
64
65        return {
66            "task": task,
67            "contributions": [
68                {"agent": agent.agent_id, "contribution": c}
69                for agent, c in zip(self.agents, contributions)
70            ]
71        }
72
73
74@dataclass
75class ReviewAndRefine:
76    """Author-reviewer pattern for quality improvement."""
77
78    author: CollaborativeAgent
79    reviewers: list[CollaborativeAgent] = field(default_factory=list)
80    max_iterations: int = 3
81
82    async def execute(self, task: dict) -> dict:
83        """Author creates, reviewers refine iteratively."""
84        # Initial contribution
85        content = await self.author.contribute(
86            task.get("topic", ""),
87            {"task": task}
88        )
89
90        iterations = []
91
92        for i in range(self.max_iterations):
93            # Parallel review
94            reviews = await asyncio.gather(*[
95                reviewer.review(content)
96                for reviewer in self.reviewers
97            ])
98
99            # Check if all approved
100            all_approved = all(r.get("approved", False) for r in reviews)
101
102            iterations.append({
103                "iteration": i + 1,
104                "content": content,
105                "reviews": reviews,
106                "approved": all_approved
107            })
108
109            if all_approved:
110                break
111
112            # Refine based on feedback
113            content = await self.author.refine(content, reviews)
114
115        return {
116            "final_content": content,
117            "iterations": iterations
118        }
119
120
121@dataclass
122class DebateAndResolve:
123    """Agents debate opposing viewpoints."""
124
125    proponent: CollaborativeAgent
126    opponent: CollaborativeAgent
127    judge: CollaborativeAgent
128    max_rounds: int = 3
129
130    async def execute(self, topic: str) -> dict:
131        """Conduct structured debate."""
132        context = {"topic": topic, "rounds": []}
133
134        for round_num in range(self.max_rounds):
135            # Proponent argues
136            pro_argument = await self.proponent.contribute(
137                topic,
138                {"role": "proponent", "context": context}
139            )
140
141            # Opponent responds
142            con_argument = await self.opponent.contribute(
143                topic,
144                {"role": "opponent", "context": context, "counter_to": pro_argument}
145            )
146
147            context["rounds"].append({
148                "round": round_num + 1,
149                "proponent": pro_argument,
150                "opponent": con_argument
151            })
152
153        # Judge renders decision
154        verdict = await self.judge.contribute(
155            topic,
156            {"role": "judge", "debate": context}
157        )
158
159        return {
160            "topic": topic,
161            "debate": context,
162            "verdict": verdict
163        }
164
165
166@dataclass
167class ExpertPanel:
168    """Multiple experts provide specialized perspectives."""
169
170    experts: dict[str, CollaborativeAgent] = field(default_factory=dict)
171    synthesizer: Optional[CollaborativeAgent] = None
172
173    async def execute(self, query: dict) -> dict:
174        """Gather expert opinions and synthesize."""
175        # Get expert perspectives in parallel
176        expert_opinions = {}
177
178        async def get_opinion(expertise: str, expert: CollaborativeAgent):
179            opinion = await expert.contribute(
180                query.get("topic", ""),
181                {"query": query, "expertise": expertise}
182            )
183            return expertise, opinion
184
185        results = await asyncio.gather(*[
186            get_opinion(expertise, expert)
187            for expertise, expert in self.experts.items()
188        ])
189
190        expert_opinions = dict(results)
191
192        # Synthesize if synthesizer available
193        synthesis = None
194        if self.synthesizer:
195            synthesis = await self.synthesizer.contribute(
196                query.get("topic", ""),
197                {"query": query, "expert_opinions": expert_opinions}
198            )
199
200        return {
201            "query": query,
202            "expert_opinions": expert_opinions,
203            "synthesis": synthesis
204        }
PatternUse CaseAgent Roles
Round RobinSequential refinementEqual contributors
Parallel ContributionIndependent perspectivesIndependent experts
Review and RefineQuality improvementAuthor + reviewers
Debate and ResolveExploring trade-offsProponent, opponent, judge
Expert PanelMulti-domain problemsDomain specialists + synthesizer

Conflict Resolution

When peers disagree, structured conflict resolution ensures progress:

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4
5
6class ConflictType(Enum):
7    """Types of conflicts between peers."""
8    RESOURCE = "resource"      # Competing for same resource
9    OPINION = "opinion"        # Different views on approach
10    PRIORITY = "priority"      # Disagreement on task ordering
11    DATA = "data"              # Conflicting data/facts
12
13
14@dataclass
15class Conflict:
16    """Represents a conflict between peers."""
17    id: str
18    conflict_type: ConflictType
19    parties: list[str]
20    positions: dict[str, dict]  # party_id -> their position
21    resolution: Optional[dict] = None
22
23
24@dataclass
25class ConflictResolver:
26    """Base conflict resolver."""
27
28    async def resolve(self, conflict: Conflict) -> dict:
29        """Resolve the conflict."""
30        raise NotImplementedError
31
32
33@dataclass
34class PriorityBasedResolver(ConflictResolver):
35    """Resolve based on agent priority."""
36
37    priorities: dict[str, int] = field(default_factory=dict)  # agent_id -> priority
38
39    async def resolve(self, conflict: Conflict) -> dict:
40        """Highest priority agent wins."""
41        highest_priority = -1
42        winner = None
43
44        for party in conflict.parties:
45            priority = self.priorities.get(party, 0)
46            if priority > highest_priority:
47                highest_priority = priority
48                winner = party
49
50        return {
51            "winner": winner,
52            "resolution": conflict.positions.get(winner, {}),
53            "method": "priority"
54        }
55
56
57@dataclass
58class VotingResolver(ConflictResolver):
59    """Resolve through peer voting."""
60
61    all_peers: list[str] = field(default_factory=list)
62    bus: Optional[MessageBus] = None
63
64    async def resolve(self, conflict: Conflict) -> dict:
65        """All peers vote on positions."""
66        # Broadcast conflict for voting
67        votes = {party: 0 for party in conflict.parties}
68
69        # In practice, this would use the message bus
70        # to collect votes from all peers
71        # Simplified here for illustration
72
73        # Mock voting result
74        import random
75        for party in conflict.parties:
76            votes[party] = random.randint(0, len(self.all_peers))
77
78        winner = max(votes, key=votes.get)
79
80        return {
81            "winner": winner,
82            "votes": votes,
83            "resolution": conflict.positions.get(winner, {}),
84            "method": "voting"
85        }
86
87
88@dataclass
89class MediatorResolver(ConflictResolver):
90    """Use a mediator agent to find compromise."""
91
92    mediator: Optional[CollaborativeAgent] = None
93
94    async def resolve(self, conflict: Conflict) -> dict:
95        """Mediator synthesizes a compromise."""
96        if not self.mediator:
97            return {"error": "No mediator available"}
98
99        # Mediator reviews all positions
100        mediated = await self.mediator.contribute(
101            "conflict_resolution",
102            {
103                "conflict_type": conflict.conflict_type.value,
104                "positions": conflict.positions,
105                "instruction": "Find a compromise that addresses concerns from all parties"
106            }
107        )
108
109        return {
110            "resolution": mediated,
111            "method": "mediation"
112        }
113
114
115@dataclass
116class EscalationResolver(ConflictResolver):
117    """Escalate to higher authority if resolution fails."""
118
119    primary_resolver: ConflictResolver
120    escalation_resolver: ConflictResolver
121    escalation_threshold: int = 3  # Failed attempts before escalation
122
123    async def resolve(self, conflict: Conflict) -> dict:
124        """Try primary resolver, escalate if needed."""
125        for attempt in range(self.escalation_threshold):
126            try:
127                resolution = await self.primary_resolver.resolve(conflict)
128                if resolution.get("resolution"):
129                    return resolution
130            except Exception:
131                continue
132
133        # Escalate
134        resolution = await self.escalation_resolver.resolve(conflict)
135        resolution["escalated"] = True
136        return resolution
137
138
139@dataclass
140class ConflictManager:
141    """Manages conflict detection and resolution."""
142
143    resolver: ConflictResolver
144    active_conflicts: dict[str, Conflict] = field(default_factory=dict)
145    resolved_conflicts: list[Conflict] = field(default_factory=list)
146
147    def detect_conflict(
148        self,
149        parties: list[str],
150        positions: dict[str, dict],
151        conflict_type: ConflictType
152    ) -> Conflict:
153        """Detect and register a conflict."""
154        conflict = Conflict(
155            id=str(uuid.uuid4()),
156            conflict_type=conflict_type,
157            parties=parties,
158            positions=positions
159        )
160        self.active_conflicts[conflict.id] = conflict
161        return conflict
162
163    async def resolve_conflict(self, conflict_id: str) -> dict:
164        """Resolve a conflict."""
165        if conflict_id not in self.active_conflicts:
166            return {"error": "Conflict not found"}
167
168        conflict = self.active_conflicts[conflict_id]
169        resolution = await self.resolver.resolve(conflict)
170
171        conflict.resolution = resolution
172        del self.active_conflicts[conflict_id]
173        self.resolved_conflicts.append(conflict)
174
175        return resolution

Key Takeaways

  • Message buses enable decoupled peer communication with support for direct messages and broadcasts.
  • Consensus mechanisms like voting and leader election allow peers to make collective decisions without central authority.
  • Collaboration patterns structure how peers work together - from parallel contributions to structured debates.
  • Conflict resolution strategies ensure progress when peers disagree, from priority-based to mediation approaches.
  • Fault tolerance is a natural benefit of P2P systems since there's no single point of failure.
Next Section Preview: We'll explore hierarchical orchestration, combining centralized control with distributed execution through layered agent structures.