Introduction
Peer-to-peer (P2P) multi-agent systems enable agents to collaborate as equals without centralized control. This pattern excels when tasks benefit from diverse perspectives, creative problem-solving, or fault tolerance. This section explores communication protocols, consensus mechanisms, and collaborative patterns for P2P agent systems.
Section Overview: We'll build peer communication infrastructure, implement consensus protocols, and explore patterns for agents to collaboratively solve problems.
Peer Communication
Effective peer-to-peer systems require robust communication infrastructure for direct agent-to-agent messaging:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable, Awaitable
3from enum import Enum
4import asyncio
5from datetime import datetime
6import uuid
7
8
9class MessageType(Enum):
10 """Types of peer messages."""
11 REQUEST = "request"
12 RESPONSE = "response"
13 BROADCAST = "broadcast"
14 PROPOSAL = "proposal"
15 VOTE = "vote"
16 ACK = "acknowledgement"
17
18
19@dataclass
20class PeerMessage:
21 """Message passed between peers."""
22 id: str
23 sender: str
24 recipient: Optional[str] # None for broadcast
25 message_type: MessageType
26 content: dict
27 timestamp: datetime = field(default_factory=datetime.now)
28 reply_to: Optional[str] = None # ID of message this replies to
29
30 @classmethod
31 def create(
32 cls,
33 sender: str,
34 recipient: Optional[str],
35 msg_type: MessageType,
36 content: dict,
37 reply_to: Optional[str] = None
38 ) -> 'PeerMessage':
39 return cls(
40 id=str(uuid.uuid4()),
41 sender=sender,
42 recipient=recipient,
43 message_type=msg_type,
44 content=content,
45 reply_to=reply_to
46 )
47
48
49@dataclass
50class MessageBus:
51 """Central message bus for peer communication."""
52
53 subscribers: dict[str, asyncio.Queue] = field(default_factory=dict)
54 message_handlers: dict[str, Callable[[PeerMessage], Awaitable[None]]] = field(
55 default_factory=dict
56 )
57 message_history: list[PeerMessage] = field(default_factory=list)
58
59 def subscribe(self, peer_id: str) -> asyncio.Queue:
60 """Subscribe peer to message bus."""
61 queue = asyncio.Queue()
62 self.subscribers[peer_id] = queue
63 return queue
64
65 def unsubscribe(self, peer_id: str):
66 """Unsubscribe peer from message bus."""
67 if peer_id in self.subscribers:
68 del self.subscribers[peer_id]
69
70 async def send(self, message: PeerMessage):
71 """Send message to specific recipient or broadcast."""
72 self.message_history.append(message)
73
74 if message.recipient:
75 # Direct message
76 if message.recipient in self.subscribers:
77 await self.subscribers[message.recipient].put(message)
78 else:
79 # Broadcast to all except sender
80 for peer_id, queue in self.subscribers.items():
81 if peer_id != message.sender:
82 await queue.put(message)
83
84 async def broadcast(self, sender: str, content: dict):
85 """Convenience method for broadcast."""
86 message = PeerMessage.create(
87 sender=sender,
88 recipient=None,
89 msg_type=MessageType.BROADCAST,
90 content=content
91 )
92 await self.send(message)
93
94
95@dataclass
96class PeerCommunicator:
97 """Communication interface for a peer agent."""
98
99 peer_id: str
100 bus: MessageBus
101 inbox: asyncio.Queue = field(default_factory=asyncio.Queue)
102 pending_responses: dict[str, asyncio.Future] = field(default_factory=dict)
103
104 def __post_init__(self):
105 # Subscribe to bus
106 self.inbox = self.bus.subscribe(self.peer_id)
107
108 async def send_request(
109 self,
110 recipient: str,
111 content: dict,
112 timeout: float = 30.0
113 ) -> Optional[PeerMessage]:
114 """Send request and wait for response."""
115 message = PeerMessage.create(
116 sender=self.peer_id,
117 recipient=recipient,
118 msg_type=MessageType.REQUEST,
119 content=content
120 )
121
122 # Create future for response
123 future = asyncio.Future()
124 self.pending_responses[message.id] = future
125
126 # Send message
127 await self.bus.send(message)
128
129 try:
130 # Wait for response
131 response = await asyncio.wait_for(future, timeout=timeout)
132 return response
133 except asyncio.TimeoutError:
134 return None
135 finally:
136 del self.pending_responses[message.id]
137
138 async def send_response(
139 self,
140 request_id: str,
141 recipient: str,
142 content: dict
143 ):
144 """Send response to a request."""
145 message = PeerMessage.create(
146 sender=self.peer_id,
147 recipient=recipient,
148 msg_type=MessageType.RESPONSE,
149 content=content,
150 reply_to=request_id
151 )
152 await self.bus.send(message)
153
154 async def broadcast(self, content: dict):
155 """Broadcast to all peers."""
156 await self.bus.broadcast(self.peer_id, content)
157
158 async def receive(self, timeout: float = None) -> Optional[PeerMessage]:
159 """Receive next message."""
160 try:
161 if timeout:
162 message = await asyncio.wait_for(
163 self.inbox.get(),
164 timeout=timeout
165 )
166 else:
167 message = await self.inbox.get()
168
169 # Check if this is a response to a pending request
170 if (message.message_type == MessageType.RESPONSE and
171 message.reply_to in self.pending_responses):
172 self.pending_responses[message.reply_to].set_result(message)
173 return None # Don't return responses, they're handled by futures
174
175 return message
176
177 except asyncio.TimeoutError:
178 return NoneConsensus Mechanisms
Without central authority, peers must reach consensus through voting, negotiation, or other agreement protocols:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4import asyncio
5
6
7class VoteType(Enum):
8 """Types of votes."""
9 APPROVE = "approve"
10 REJECT = "reject"
11 ABSTAIN = "abstain"
12
13
14@dataclass
15class Proposal:
16 """A proposal for consensus."""
17 id: str
18 proposer: str
19 content: dict
20 votes: dict[str, VoteType] = field(default_factory=dict)
21 status: str = "pending" # pending, accepted, rejected
22
23 def add_vote(self, voter: str, vote: VoteType):
24 self.votes[voter] = vote
25
26 def tally(self, total_voters: int) -> dict:
27 approvals = sum(1 for v in self.votes.values() if v == VoteType.APPROVE)
28 rejections = sum(1 for v in self.votes.values() if v == VoteType.REJECT)
29 abstentions = sum(1 for v in self.votes.values() if v == VoteType.ABSTAIN)
30
31 return {
32 "approvals": approvals,
33 "rejections": rejections,
34 "abstentions": abstentions,
35 "pending": total_voters - len(self.votes)
36 }
37
38
39@dataclass
40class ConsensusProtocol:
41 """Base consensus protocol."""
42
43 peers: list[str] = field(default_factory=list)
44 proposals: dict[str, Proposal] = field(default_factory=dict)
45 quorum: float = 0.5 # Minimum participation required
46 threshold: float = 0.5 # Approval threshold
47
48 def create_proposal(self, proposer: str, content: dict) -> str:
49 """Create a new proposal."""
50 proposal_id = str(uuid.uuid4())
51 proposal = Proposal(id=proposal_id, proposer=proposer, content=content)
52 self.proposals[proposal_id] = proposal
53 return proposal_id
54
55 def vote(self, proposal_id: str, voter: str, vote: VoteType):
56 """Cast a vote on a proposal."""
57 if proposal_id in self.proposals:
58 self.proposals[proposal_id].add_vote(voter, vote)
59 self._check_consensus(proposal_id)
60
61 def _check_consensus(self, proposal_id: str):
62 """Check if consensus has been reached."""
63 proposal = self.proposals[proposal_id]
64 tally = proposal.tally(len(self.peers))
65
66 # Check quorum
67 participated = len(proposal.votes)
68 if participated / len(self.peers) < self.quorum:
69 return # Not enough participation yet
70
71 # Check threshold
72 approvals = tally["approvals"]
73 if approvals / participated >= self.threshold:
74 proposal.status = "accepted"
75 elif tally["rejections"] / participated > (1 - self.threshold):
76 proposal.status = "rejected"
77
78
79@dataclass
80class MajorityVoting(ConsensusProtocol):
81 """Simple majority voting consensus."""
82
83 threshold: float = 0.5
84
85 def is_accepted(self, proposal_id: str) -> bool:
86 return self.proposals[proposal_id].status == "accepted"
87
88
89@dataclass
90class SuperMajorityVoting(ConsensusProtocol):
91 """Requires 2/3 majority for acceptance."""
92
93 threshold: float = 0.67
94
95
96@dataclass
97class UnanimousConsensus(ConsensusProtocol):
98 """Requires all peers to approve."""
99
100 threshold: float = 1.0
101
102
103@dataclass
104class ConsensusCoordinator:
105 """Coordinates consensus process among peers."""
106
107 protocol: ConsensusProtocol
108 bus: MessageBus
109 peer_id: str
110
111 async def propose(self, content: dict) -> str:
112 """Propose something for consensus."""
113 proposal_id = self.protocol.create_proposal(self.peer_id, content)
114
115 # Broadcast proposal
116 await self.bus.broadcast(
117 self.peer_id,
118 {
119 "type": "proposal",
120 "proposal_id": proposal_id,
121 "content": content
122 }
123 )
124
125 return proposal_id
126
127 async def cast_vote(self, proposal_id: str, vote: VoteType):
128 """Cast vote on a proposal."""
129 self.protocol.vote(proposal_id, self.peer_id, vote)
130
131 # Broadcast vote
132 await self.bus.broadcast(
133 self.peer_id,
134 {
135 "type": "vote",
136 "proposal_id": proposal_id,
137 "vote": vote.value
138 }
139 )
140
141 async def wait_for_consensus(
142 self,
143 proposal_id: str,
144 timeout: float = 60.0
145 ) -> str:
146 """Wait for consensus on a proposal."""
147 start = asyncio.get_event_loop().time()
148
149 while asyncio.get_event_loop().time() - start < timeout:
150 proposal = self.protocol.proposals.get(proposal_id)
151 if proposal and proposal.status != "pending":
152 return proposal.status
153 await asyncio.sleep(0.5)
154
155 return "timeout"Raft-Inspired Leader Election
🐍python
1from dataclasses import dataclass, field
2from typing import Optional
3import asyncio
4import random
5
6
7class NodeState(Enum):
8 """State of a node in leader election."""
9 FOLLOWER = "follower"
10 CANDIDATE = "candidate"
11 LEADER = "leader"
12
13
14@dataclass
15class LeaderElection:
16 """Raft-inspired leader election for peer agents."""
17
18 node_id: str
19 peers: list[str] = field(default_factory=list)
20 state: NodeState = NodeState.FOLLOWER
21 current_term: int = 0
22 voted_for: Optional[str] = None
23 current_leader: Optional[str] = None
24
25 # Timing
26 election_timeout_min: float = 1.5
27 election_timeout_max: float = 3.0
28 heartbeat_interval: float = 0.5
29
30 # Votes received
31 votes_received: set = field(default_factory=set)
32
33 def _random_timeout(self) -> float:
34 """Random election timeout to avoid split votes."""
35 return random.uniform(
36 self.election_timeout_min,
37 self.election_timeout_max
38 )
39
40 async def start_election(self):
41 """Start a new election as candidate."""
42 self.state = NodeState.CANDIDATE
43 self.current_term += 1
44 self.voted_for = self.node_id
45 self.votes_received = {self.node_id}
46
47 print(f"[self.node_id] starting election for term [self.current_term]")
48
49 def request_vote(self, candidate_id: str, term: int) -> bool:
50 """Handle vote request from a candidate."""
51 if term < self.current_term:
52 return False
53
54 if term > self.current_term:
55 self.current_term = term
56 self.state = NodeState.FOLLOWER
57 self.voted_for = None
58
59 if self.voted_for is None or self.voted_for == candidate_id:
60 self.voted_for = candidate_id
61 return True
62
63 return False
64
65 def receive_vote(self, voter: str, granted: bool):
66 """Receive vote response."""
67 if granted:
68 self.votes_received.add(voter)
69
70 # Check if we have majority
71 if len(self.votes_received) > (len(self.peers) + 1) / 2:
72 self.become_leader()
73
74 def become_leader(self):
75 """Transition to leader state."""
76 self.state = NodeState.LEADER
77 self.current_leader = self.node_id
78 print(f"[self.node_id] became leader for term [self.current_term]")
79
80 def receive_heartbeat(self, leader_id: str, term: int):
81 """Receive heartbeat from leader."""
82 if term >= self.current_term:
83 self.current_term = term
84 self.state = NodeState.FOLLOWER
85 self.current_leader = leader_id
86 self.voted_for = None
87
88
89@dataclass
90class DistributedLeaderElection:
91 """Full distributed leader election with message passing."""
92
93 node_id: str
94 bus: MessageBus
95 election: LeaderElection = field(default_factory=lambda: LeaderElection(node_id=""))
96 running: bool = False
97
98 def __post_init__(self):
99 self.election = LeaderElection(node_id=self.node_id)
100
101 async def run(self):
102 """Run the leader election loop."""
103 self.running = True
104
105 while self.running:
106 if self.election.state == NodeState.LEADER:
107 await self._leader_loop()
108 else:
109 await self._follower_loop()
110
111 async def _leader_loop(self):
112 """Send heartbeats as leader."""
113 await self.bus.broadcast(
114 self.node_id,
115 {
116 "type": "heartbeat",
117 "term": self.election.current_term,
118 "leader_id": self.node_id
119 }
120 )
121 await asyncio.sleep(self.election.heartbeat_interval)
122
123 async def _follower_loop(self):
124 """Wait for heartbeat or start election."""
125 timeout = self.election._random_timeout()
126
127 try:
128 # Wait for message
129 queue = self.bus.subscribers.get(self.node_id)
130 if queue:
131 message = await asyncio.wait_for(queue.get(), timeout=timeout)
132 await self._handle_message(message)
133 except asyncio.TimeoutError:
134 # No heartbeat received, start election
135 await self.election.start_election()
136 await self._request_votes()
137
138 async def _request_votes(self):
139 """Request votes from all peers."""
140 await self.bus.broadcast(
141 self.node_id,
142 {
143 "type": "vote_request",
144 "term": self.election.current_term,
145 "candidate_id": self.node_id
146 }
147 )
148
149 async def _handle_message(self, message: PeerMessage):
150 """Handle incoming election message."""
151 content = message.content
152 msg_type = content.get("type")
153
154 if msg_type == "heartbeat":
155 self.election.receive_heartbeat(
156 content["leader_id"],
157 content["term"]
158 )
159
160 elif msg_type == "vote_request":
161 granted = self.election.request_vote(
162 content["candidate_id"],
163 content["term"]
164 )
165 await self.bus.send(PeerMessage.create(
166 sender=self.node_id,
167 recipient=message.sender,
168 msg_type=MessageType.RESPONSE,
169 content={"type": "vote_response", "granted": granted}
170 ))
171
172 elif msg_type == "vote_response":
173 self.election.receive_vote(message.sender, content["granted"])Collaborative Patterns
Peers can collaborate in various patterns depending on the nature of the task:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Callable, Awaitable
3import asyncio
4
5
6@dataclass
7class CollaborativeAgent:
8 """Agent that participates in peer collaboration."""
9
10 agent_id: str
11 communicator: PeerCommunicator
12 expertise: list[str] = field(default_factory=list)
13
14 async def contribute(self, topic: str, context: dict) -> dict:
15 """Provide contribution based on expertise."""
16 # Override in specific implementations
17 return {"contribution": "base contribution"}
18
19 async def review(self, content: dict) -> dict:
20 """Review another peer's work."""
21 return {"approved": True, "feedback": ""}
22
23 async def refine(self, content: dict, feedback: list[dict]) -> dict:
24 """Refine content based on feedback."""
25 return content
26
27
28@dataclass
29class RoundRobinCollaboration:
30 """Each peer contributes in sequence."""
31
32 agents: list[CollaborativeAgent] = field(default_factory=list)
33 current_index: int = 0
34
35 async def execute(self, task: dict) -> dict:
36 """Execute task through round-robin contributions."""
37 context = {"task": task, "contributions": []}
38
39 for agent in self.agents:
40 contribution = await agent.contribute(
41 task.get("topic", ""),
42 context
43 )
44 context["contributions"].append({
45 "agent": agent.agent_id,
46 "contribution": contribution
47 })
48
49 return context
50
51
52@dataclass
53class ParallelContribution:
54 """All peers contribute simultaneously."""
55
56 agents: list[CollaborativeAgent] = field(default_factory=list)
57
58 async def execute(self, task: dict) -> dict:
59 """All agents contribute in parallel."""
60 contributions = await asyncio.gather(*[
61 agent.contribute(task.get("topic", ""), {"task": task})
62 for agent in self.agents
63 ])
64
65 return {
66 "task": task,
67 "contributions": [
68 {"agent": agent.agent_id, "contribution": c}
69 for agent, c in zip(self.agents, contributions)
70 ]
71 }
72
73
74@dataclass
75class ReviewAndRefine:
76 """Author-reviewer pattern for quality improvement."""
77
78 author: CollaborativeAgent
79 reviewers: list[CollaborativeAgent] = field(default_factory=list)
80 max_iterations: int = 3
81
82 async def execute(self, task: dict) -> dict:
83 """Author creates, reviewers refine iteratively."""
84 # Initial contribution
85 content = await self.author.contribute(
86 task.get("topic", ""),
87 {"task": task}
88 )
89
90 iterations = []
91
92 for i in range(self.max_iterations):
93 # Parallel review
94 reviews = await asyncio.gather(*[
95 reviewer.review(content)
96 for reviewer in self.reviewers
97 ])
98
99 # Check if all approved
100 all_approved = all(r.get("approved", False) for r in reviews)
101
102 iterations.append({
103 "iteration": i + 1,
104 "content": content,
105 "reviews": reviews,
106 "approved": all_approved
107 })
108
109 if all_approved:
110 break
111
112 # Refine based on feedback
113 content = await self.author.refine(content, reviews)
114
115 return {
116 "final_content": content,
117 "iterations": iterations
118 }
119
120
121@dataclass
122class DebateAndResolve:
123 """Agents debate opposing viewpoints."""
124
125 proponent: CollaborativeAgent
126 opponent: CollaborativeAgent
127 judge: CollaborativeAgent
128 max_rounds: int = 3
129
130 async def execute(self, topic: str) -> dict:
131 """Conduct structured debate."""
132 context = {"topic": topic, "rounds": []}
133
134 for round_num in range(self.max_rounds):
135 # Proponent argues
136 pro_argument = await self.proponent.contribute(
137 topic,
138 {"role": "proponent", "context": context}
139 )
140
141 # Opponent responds
142 con_argument = await self.opponent.contribute(
143 topic,
144 {"role": "opponent", "context": context, "counter_to": pro_argument}
145 )
146
147 context["rounds"].append({
148 "round": round_num + 1,
149 "proponent": pro_argument,
150 "opponent": con_argument
151 })
152
153 # Judge renders decision
154 verdict = await self.judge.contribute(
155 topic,
156 {"role": "judge", "debate": context}
157 )
158
159 return {
160 "topic": topic,
161 "debate": context,
162 "verdict": verdict
163 }
164
165
166@dataclass
167class ExpertPanel:
168 """Multiple experts provide specialized perspectives."""
169
170 experts: dict[str, CollaborativeAgent] = field(default_factory=dict)
171 synthesizer: Optional[CollaborativeAgent] = None
172
173 async def execute(self, query: dict) -> dict:
174 """Gather expert opinions and synthesize."""
175 # Get expert perspectives in parallel
176 expert_opinions = {}
177
178 async def get_opinion(expertise: str, expert: CollaborativeAgent):
179 opinion = await expert.contribute(
180 query.get("topic", ""),
181 {"query": query, "expertise": expertise}
182 )
183 return expertise, opinion
184
185 results = await asyncio.gather(*[
186 get_opinion(expertise, expert)
187 for expertise, expert in self.experts.items()
188 ])
189
190 expert_opinions = dict(results)
191
192 # Synthesize if synthesizer available
193 synthesis = None
194 if self.synthesizer:
195 synthesis = await self.synthesizer.contribute(
196 query.get("topic", ""),
197 {"query": query, "expert_opinions": expert_opinions}
198 )
199
200 return {
201 "query": query,
202 "expert_opinions": expert_opinions,
203 "synthesis": synthesis
204 }| Pattern | Use Case | Agent Roles |
|---|---|---|
| Round Robin | Sequential refinement | Equal contributors |
| Parallel Contribution | Independent perspectives | Independent experts |
| Review and Refine | Quality improvement | Author + reviewers |
| Debate and Resolve | Exploring trade-offs | Proponent, opponent, judge |
| Expert Panel | Multi-domain problems | Domain specialists + synthesizer |
Conflict Resolution
When peers disagree, structured conflict resolution ensures progress:
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4
5
6class ConflictType(Enum):
7 """Types of conflicts between peers."""
8 RESOURCE = "resource" # Competing for same resource
9 OPINION = "opinion" # Different views on approach
10 PRIORITY = "priority" # Disagreement on task ordering
11 DATA = "data" # Conflicting data/facts
12
13
14@dataclass
15class Conflict:
16 """Represents a conflict between peers."""
17 id: str
18 conflict_type: ConflictType
19 parties: list[str]
20 positions: dict[str, dict] # party_id -> their position
21 resolution: Optional[dict] = None
22
23
24@dataclass
25class ConflictResolver:
26 """Base conflict resolver."""
27
28 async def resolve(self, conflict: Conflict) -> dict:
29 """Resolve the conflict."""
30 raise NotImplementedError
31
32
33@dataclass
34class PriorityBasedResolver(ConflictResolver):
35 """Resolve based on agent priority."""
36
37 priorities: dict[str, int] = field(default_factory=dict) # agent_id -> priority
38
39 async def resolve(self, conflict: Conflict) -> dict:
40 """Highest priority agent wins."""
41 highest_priority = -1
42 winner = None
43
44 for party in conflict.parties:
45 priority = self.priorities.get(party, 0)
46 if priority > highest_priority:
47 highest_priority = priority
48 winner = party
49
50 return {
51 "winner": winner,
52 "resolution": conflict.positions.get(winner, {}),
53 "method": "priority"
54 }
55
56
57@dataclass
58class VotingResolver(ConflictResolver):
59 """Resolve through peer voting."""
60
61 all_peers: list[str] = field(default_factory=list)
62 bus: Optional[MessageBus] = None
63
64 async def resolve(self, conflict: Conflict) -> dict:
65 """All peers vote on positions."""
66 # Broadcast conflict for voting
67 votes = {party: 0 for party in conflict.parties}
68
69 # In practice, this would use the message bus
70 # to collect votes from all peers
71 # Simplified here for illustration
72
73 # Mock voting result
74 import random
75 for party in conflict.parties:
76 votes[party] = random.randint(0, len(self.all_peers))
77
78 winner = max(votes, key=votes.get)
79
80 return {
81 "winner": winner,
82 "votes": votes,
83 "resolution": conflict.positions.get(winner, {}),
84 "method": "voting"
85 }
86
87
88@dataclass
89class MediatorResolver(ConflictResolver):
90 """Use a mediator agent to find compromise."""
91
92 mediator: Optional[CollaborativeAgent] = None
93
94 async def resolve(self, conflict: Conflict) -> dict:
95 """Mediator synthesizes a compromise."""
96 if not self.mediator:
97 return {"error": "No mediator available"}
98
99 # Mediator reviews all positions
100 mediated = await self.mediator.contribute(
101 "conflict_resolution",
102 {
103 "conflict_type": conflict.conflict_type.value,
104 "positions": conflict.positions,
105 "instruction": "Find a compromise that addresses concerns from all parties"
106 }
107 )
108
109 return {
110 "resolution": mediated,
111 "method": "mediation"
112 }
113
114
115@dataclass
116class EscalationResolver(ConflictResolver):
117 """Escalate to higher authority if resolution fails."""
118
119 primary_resolver: ConflictResolver
120 escalation_resolver: ConflictResolver
121 escalation_threshold: int = 3 # Failed attempts before escalation
122
123 async def resolve(self, conflict: Conflict) -> dict:
124 """Try primary resolver, escalate if needed."""
125 for attempt in range(self.escalation_threshold):
126 try:
127 resolution = await self.primary_resolver.resolve(conflict)
128 if resolution.get("resolution"):
129 return resolution
130 except Exception:
131 continue
132
133 # Escalate
134 resolution = await self.escalation_resolver.resolve(conflict)
135 resolution["escalated"] = True
136 return resolution
137
138
139@dataclass
140class ConflictManager:
141 """Manages conflict detection and resolution."""
142
143 resolver: ConflictResolver
144 active_conflicts: dict[str, Conflict] = field(default_factory=dict)
145 resolved_conflicts: list[Conflict] = field(default_factory=list)
146
147 def detect_conflict(
148 self,
149 parties: list[str],
150 positions: dict[str, dict],
151 conflict_type: ConflictType
152 ) -> Conflict:
153 """Detect and register a conflict."""
154 conflict = Conflict(
155 id=str(uuid.uuid4()),
156 conflict_type=conflict_type,
157 parties=parties,
158 positions=positions
159 )
160 self.active_conflicts[conflict.id] = conflict
161 return conflict
162
163 async def resolve_conflict(self, conflict_id: str) -> dict:
164 """Resolve a conflict."""
165 if conflict_id not in self.active_conflicts:
166 return {"error": "Conflict not found"}
167
168 conflict = self.active_conflicts[conflict_id]
169 resolution = await self.resolver.resolve(conflict)
170
171 conflict.resolution = resolution
172 del self.active_conflicts[conflict_id]
173 self.resolved_conflicts.append(conflict)
174
175 return resolutionKey Takeaways
- Message buses enable decoupled peer communication with support for direct messages and broadcasts.
- Consensus mechanisms like voting and leader election allow peers to make collective decisions without central authority.
- Collaboration patterns structure how peers work together - from parallel contributions to structured debates.
- Conflict resolution strategies ensure progress when peers disagree, from priority-based to mediation approaches.
- Fault tolerance is a natural benefit of P2P systems since there's no single point of failure.
Next Section Preview: We'll explore hierarchical orchestration, combining centralized control with distributed execution through layered agent structures.