Introduction
Moving agentic AI systems from prototype to production requires careful architectural planning. Production systems must handle concurrent users, maintain reliability under load, and provide consistent performance. This section explores proven architecture patterns for deploying agent systems at scale.
Learning Objectives: By the end of this section, you will understand core architectural patterns for production agents, learn how to design services that scale horizontally, implement proper state management strategies, and choose appropriate communication patterns for your use case.
The transition from development to production often reveals challenges that weren't apparent during prototyping. Latency requirements become strict, resource utilization matters, and failure scenarios that seemed unlikely become inevitable. A well-designed architecture addresses these concerns upfront.
Architectural Foundations
Production agent architectures typically follow layered designs that separate concerns and enable independent scaling. Understanding these layers helps you make informed decisions about where to place functionality and how components interact.
Core Architecture Layers
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, Optional, Generic, TypeVar
4from enum import Enum
5import asyncio
6from datetime import datetime
7import uuid
8
9
10T = TypeVar('T')
11R = TypeVar('R')
12
13
14class ArchitectureLayer(Enum):
15 """Layers in the agent architecture."""
16 GATEWAY = "gateway" # API and request routing
17 ORCHESTRATION = "orchestration" # Workflow coordination
18 EXECUTION = "execution" # Agent logic execution
19 INTEGRATION = "integration" # External service calls
20 PERSISTENCE = "persistence" # State and data storage
21
22
23@dataclass
24class RequestContext:
25 """Context for a request flowing through the system."""
26 request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
27 user_id: Optional[str] = None
28 session_id: Optional[str] = None
29 tenant_id: Optional[str] = None
30 created_at: datetime = field(default_factory=datetime.utcnow)
31 metadata: dict[str, Any] = field(default_factory=dict)
32 trace_id: Optional[str] = None
33 span_id: Optional[str] = None
34
35 def child_context(self) -> "RequestContext":
36 """Create a child context for nested operations."""
37 return RequestContext(
38 request_id=self.request_id,
39 user_id=self.user_id,
40 session_id=self.session_id,
41 tenant_id=self.tenant_id,
42 trace_id=self.trace_id,
43 span_id=str(uuid.uuid4()),
44 metadata=dict(self.metadata)
45 )
46
47
48class LayerComponent(ABC, Generic[T, R]):
49 """Abstract component in an architecture layer."""
50
51 def __init__(self, layer: ArchitectureLayer, name: str):
52 self.layer = layer
53 self.name = name
54
55 @abstractmethod
56 async def process(
57 self,
58 request: T,
59 context: RequestContext
60 ) -> R:
61 """Process request and return result."""
62 pass
63
64
65@dataclass
66class SystemConfig:
67 """Production system configuration."""
68 # Service configuration
69 service_name: str = "agent-service"
70 environment: str = "production"
71 version: str = "1.0.0"
72
73 # Resource limits
74 max_concurrent_requests: int = 100
75 request_timeout_seconds: float = 30.0
76 max_retries: int = 3
77
78 # Feature flags
79 enable_tracing: bool = True
80 enable_metrics: bool = True
81 enable_circuit_breaker: bool = True
82
83 # Rate limiting
84 rate_limit_requests_per_second: float = 100.0
85 rate_limit_burst: int = 200
86
87
88class ProductionArchitecture:
89 """Production architecture orchestrator."""
90
91 def __init__(self, config: SystemConfig):
92 self.config = config
93 self.layers: dict[ArchitectureLayer, list[LayerComponent]] = {
94 layer: [] for layer in ArchitectureLayer
95 }
96 self._semaphore = asyncio.Semaphore(config.max_concurrent_requests)
97
98 def register_component(
99 self,
100 component: LayerComponent
101 ) -> None:
102 """Register a component in its layer."""
103 self.layers[component.layer].append(component)
104
105 async def handle_request(
106 self,
107 request: Any,
108 context: Optional[RequestContext] = None
109 ) -> Any:
110 """Handle a request through all layers."""
111 context = context or RequestContext()
112
113 async with self._semaphore:
114 try:
115 result = request
116
117 # Process through layers in order
118 for layer in ArchitectureLayer:
119 for component in self.layers[layer]:
120 result = await asyncio.wait_for(
121 component.process(result, context),
122 timeout=self.config.request_timeout_seconds
123 )
124
125 return result
126
127 except asyncio.TimeoutError:
128 raise TimeoutError(
129 f"Request {context.request_id} timed out"
130 )
131
132 except Exception as e:
133 # Log and re-raise with context
134 raise RuntimeError(
135 f"Request {context.request_id} failed: {e}"
136 ) from eThe layered architecture separates gateway logic (authentication, routing), orchestration (workflow management), execution (agent logic), integration (external calls), and persistence (state storage). Each layer can scale independently based on its specific resource requirements.
| Layer | Responsibility | Scaling Pattern |
|---|---|---|
| Gateway | Authentication, routing, rate limiting | Horizontal with load balancer |
| Orchestration | Workflow coordination, state machines | Horizontal with partitioning |
| Execution | Agent logic, LLM calls | Horizontal with queue-based distribution |
| Integration | External API calls, tools | Horizontal with connection pooling |
| Persistence | State storage, caching | Vertical + read replicas |
Service Patterns
Several service patterns work well for agent systems. The choice depends on your scalability requirements, team structure, and operational capabilities.
Microservices Agent Architecture
1from dataclasses import dataclass
2from typing import Any, Optional
3from enum import Enum
4import httpx
5import asyncio
6
7
8class ServiceType(Enum):
9 """Types of agent services."""
10 GATEWAY = "gateway"
11 ORCHESTRATOR = "orchestrator"
12 EXECUTOR = "executor"
13 TOOL_RUNNER = "tool_runner"
14 STATE_MANAGER = "state_manager"
15
16
17@dataclass
18class ServiceEndpoint:
19 """Service endpoint configuration."""
20 service_type: ServiceType
21 host: str
22 port: int
23 health_path: str = "/health"
24 timeout_seconds: float = 30.0
25
26 @property
27 def base_url(self) -> str:
28 return f"http://{self.host}:{self.port}"
29
30
31class ServiceRegistry:
32 """Registry for service discovery."""
33
34 def __init__(self):
35 self._services: dict[ServiceType, list[ServiceEndpoint]] = {}
36 self._health_status: dict[str, bool] = {}
37
38 def register(self, endpoint: ServiceEndpoint) -> None:
39 """Register a service endpoint."""
40 if endpoint.service_type not in self._services:
41 self._services[endpoint.service_type] = []
42 self._services[endpoint.service_type].append(endpoint)
43 self._health_status[endpoint.base_url] = True
44
45 def get_endpoint(
46 self,
47 service_type: ServiceType
48 ) -> Optional[ServiceEndpoint]:
49 """Get a healthy endpoint for a service type."""
50 endpoints = self._services.get(service_type, [])
51 healthy = [
52 ep for ep in endpoints
53 if self._health_status.get(ep.base_url, False)
54 ]
55
56 if not healthy:
57 return None
58
59 # Simple round-robin (in production use proper load balancing)
60 return healthy[hash(str(asyncio.current_task())) % len(healthy)]
61
62 async def health_check(self) -> dict[str, bool]:
63 """Check health of all registered services."""
64 async with httpx.AsyncClient() as client:
65 for service_type, endpoints in self._services.items():
66 for endpoint in endpoints:
67 try:
68 response = await client.get(
69 f"{endpoint.base_url}{endpoint.health_path}",
70 timeout=5.0
71 )
72 self._health_status[endpoint.base_url] = (
73 response.status_code == 200
74 )
75 except Exception:
76 self._health_status[endpoint.base_url] = False
77
78 return dict(self._health_status)
79
80
81class ServiceClient:
82 """Client for calling other services."""
83
84 def __init__(
85 self,
86 registry: ServiceRegistry,
87 timeout: float = 30.0
88 ):
89 self.registry = registry
90 self.timeout = timeout
91 self._client = httpx.AsyncClient(timeout=timeout)
92
93 async def call(
94 self,
95 service_type: ServiceType,
96 path: str,
97 method: str = "POST",
98 data: Optional[dict[str, Any]] = None,
99 context: Optional[RequestContext] = None
100 ) -> dict[str, Any]:
101 """Call a service endpoint."""
102 endpoint = self.registry.get_endpoint(service_type)
103
104 if not endpoint:
105 raise RuntimeError(f"No healthy {service_type.value} service")
106
107 headers = {}
108 if context:
109 headers["X-Request-ID"] = context.request_id
110 if context.trace_id:
111 headers["X-Trace-ID"] = context.trace_id
112 if context.span_id:
113 headers["X-Span-ID"] = context.span_id
114
115 url = f"{endpoint.base_url}{path}"
116
117 response = await self._client.request(
118 method=method,
119 url=url,
120 json=data,
121 headers=headers
122 )
123 response.raise_for_status()
124
125 return response.json()
126
127 async def close(self) -> None:
128 """Close the HTTP client."""
129 await self._client.aclose()
130
131
132# Service implementations
133class GatewayService:
134 """API Gateway service."""
135
136 def __init__(
137 self,
138 service_client: ServiceClient,
139 rate_limiter: "RateLimiter"
140 ):
141 self.client = service_client
142 self.rate_limiter = rate_limiter
143
144 async def handle_request(
145 self,
146 request: dict[str, Any],
147 context: RequestContext
148 ) -> dict[str, Any]:
149 """Handle incoming request."""
150 # Check rate limit
151 if not await self.rate_limiter.allow(context.user_id or "anonymous"):
152 raise RuntimeError("Rate limit exceeded")
153
154 # Forward to orchestrator
155 return await self.client.call(
156 ServiceType.ORCHESTRATOR,
157 "/process",
158 data=request,
159 context=context
160 )
161
162
163class OrchestratorService:
164 """Workflow orchestration service."""
165
166 def __init__(self, service_client: ServiceClient):
167 self.client = service_client
168
169 async def process(
170 self,
171 request: dict[str, Any],
172 context: RequestContext
173 ) -> dict[str, Any]:
174 """Process a workflow request."""
175 workflow_type = request.get("workflow_type", "simple")
176
177 if workflow_type == "simple":
178 return await self._simple_workflow(request, context)
179 elif workflow_type == "multi_step":
180 return await self._multi_step_workflow(request, context)
181 else:
182 raise ValueError(f"Unknown workflow: {workflow_type}")
183
184 async def _simple_workflow(
185 self,
186 request: dict[str, Any],
187 context: RequestContext
188 ) -> dict[str, Any]:
189 """Execute simple single-step workflow."""
190 return await self.client.call(
191 ServiceType.EXECUTOR,
192 "/execute",
193 data={"task": request.get("task")},
194 context=context
195 )
196
197 async def _multi_step_workflow(
198 self,
199 request: dict[str, Any],
200 context: RequestContext
201 ) -> dict[str, Any]:
202 """Execute multi-step workflow."""
203 steps = request.get("steps", [])
204 results = []
205
206 for step in steps:
207 # Get current state
208 state = await self.client.call(
209 ServiceType.STATE_MANAGER,
210 "/state/get",
211 data={"session_id": context.session_id},
212 context=context
213 )
214
215 # Execute step
216 result = await self.client.call(
217 ServiceType.EXECUTOR,
218 "/execute",
219 data={"task": step, "state": state},
220 context=context
221 )
222
223 # Update state
224 await self.client.call(
225 ServiceType.STATE_MANAGER,
226 "/state/update",
227 data={
228 "session_id": context.session_id,
229 "update": result
230 },
231 context=context
232 )
233
234 results.append(result)
235
236 return {"steps_completed": len(results), "results": results}The microservices pattern separates concerns into independently deployable services. This enables teams to work on different components simultaneously and scale each service based on its specific load characteristics.
Modular Monolith Pattern
1from dataclasses import dataclass
2from typing import Any, Protocol, runtime_checkable
3import asyncio
4
5
6@runtime_checkable
7class Module(Protocol):
8 """Protocol for architecture modules."""
9
10 @property
11 def name(self) -> str:
12 ...
13
14 async def initialize(self) -> None:
15 ...
16
17 async def shutdown(self) -> None:
18 ...
19
20
21@dataclass
22class ModuleConfig:
23 """Configuration for a module."""
24 enabled: bool = True
25 max_concurrent: int = 10
26 timeout_seconds: float = 30.0
27
28
29class ModularMonolith:
30 """Modular monolith architecture."""
31
32 def __init__(self):
33 self._modules: dict[str, Module] = {}
34 self._configs: dict[str, ModuleConfig] = {}
35 self._initialized = False
36
37 def register_module(
38 self,
39 module: Module,
40 config: Optional[ModuleConfig] = None
41 ) -> None:
42 """Register a module."""
43 self._modules[module.name] = module
44 self._configs[module.name] = config or ModuleConfig()
45
46 async def initialize(self) -> None:
47 """Initialize all modules."""
48 for name, module in self._modules.items():
49 config = self._configs[name]
50 if config.enabled:
51 await module.initialize()
52 self._initialized = True
53
54 async def shutdown(self) -> None:
55 """Shutdown all modules."""
56 for module in self._modules.values():
57 await module.shutdown()
58 self._initialized = False
59
60 def get_module(self, name: str) -> Optional[Module]:
61 """Get a module by name."""
62 return self._modules.get(name)
63
64
65class GatewayModule:
66 """Gateway module for request handling."""
67
68 name = "gateway"
69
70 def __init__(self, orchestrator: "OrchestratorModule"):
71 self.orchestrator = orchestrator
72 self._rate_limiters: dict[str, float] = {}
73
74 async def initialize(self) -> None:
75 pass
76
77 async def shutdown(self) -> None:
78 pass
79
80 async def handle(
81 self,
82 request: dict[str, Any],
83 context: RequestContext
84 ) -> dict[str, Any]:
85 """Handle incoming request."""
86 # Rate limiting check
87 user = context.user_id or "anonymous"
88 last_request = self._rate_limiters.get(user, 0)
89 now = asyncio.get_event_loop().time()
90
91 if now - last_request < 0.1: # 10 requests per second
92 raise RuntimeError("Rate limit exceeded")
93
94 self._rate_limiters[user] = now
95
96 return await self.orchestrator.process(request, context)
97
98
99class OrchestratorModule:
100 """Orchestrator module for workflow management."""
101
102 name = "orchestrator"
103
104 def __init__(self, executor: "ExecutorModule"):
105 self.executor = executor
106
107 async def initialize(self) -> None:
108 pass
109
110 async def shutdown(self) -> None:
111 pass
112
113 async def process(
114 self,
115 request: dict[str, Any],
116 context: RequestContext
117 ) -> dict[str, Any]:
118 """Process workflow request."""
119 task = request.get("task", "")
120 return await self.executor.execute(task, context)
121
122
123class ExecutorModule:
124 """Executor module for agent logic."""
125
126 name = "executor"
127
128 def __init__(self, llm_client: Any):
129 self.llm_client = llm_client
130
131 async def initialize(self) -> None:
132 pass
133
134 async def shutdown(self) -> None:
135 pass
136
137 async def execute(
138 self,
139 task: str,
140 context: RequestContext
141 ) -> dict[str, Any]:
142 """Execute agent task."""
143 # Call LLM
144 response = await self.llm_client.complete(task)
145
146 return {
147 "task": task,
148 "response": response,
149 "request_id": context.request_id
150 }
151
152
153# Assembly
154def create_modular_monolith(llm_client: Any) -> ModularMonolith:
155 """Create and wire up the modular monolith."""
156 app = ModularMonolith()
157
158 # Create modules with dependencies
159 executor = ExecutorModule(llm_client)
160 orchestrator = OrchestratorModule(executor)
161 gateway = GatewayModule(orchestrator)
162
163 # Register all modules
164 app.register_module(executor)
165 app.register_module(orchestrator)
166 app.register_module(gateway)
167
168 return appThe modular monolith provides the organizational benefits of microservices while maintaining the operational simplicity of a single deployment. Clear module boundaries enable future extraction if needed.
State Management
Production agents require robust state management for conversation history, user preferences, and workflow progress. The choice of state management strategy affects scalability, consistency, and failure recovery.
Distributed State Management
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, Optional, TypeVar, Generic
4from datetime import datetime, timedelta
5import json
6import asyncio
7import hashlib
8
9
10T = TypeVar('T')
11
12
13@dataclass
14class StateEntry(Generic[T]):
15 """Entry in the state store."""
16 key: str
17 value: T
18 version: int
19 created_at: datetime
20 updated_at: datetime
21 expires_at: Optional[datetime] = None
22 metadata: dict[str, Any] = field(default_factory=dict)
23
24 @property
25 def is_expired(self) -> bool:
26 if self.expires_at is None:
27 return False
28 return datetime.utcnow() > self.expires_at
29
30
31class StateStore(ABC):
32 """Abstract state store interface."""
33
34 @abstractmethod
35 async def get(self, key: str) -> Optional[StateEntry]:
36 """Get state entry by key."""
37 pass
38
39 @abstractmethod
40 async def set(
41 self,
42 key: str,
43 value: Any,
44 ttl_seconds: Optional[int] = None
45 ) -> StateEntry:
46 """Set state entry."""
47 pass
48
49 @abstractmethod
50 async def delete(self, key: str) -> bool:
51 """Delete state entry."""
52 pass
53
54 @abstractmethod
55 async def compare_and_set(
56 self,
57 key: str,
58 expected_version: int,
59 new_value: Any
60 ) -> bool:
61 """Atomic compare-and-set operation."""
62 pass
63
64
65class RedisStateStore(StateStore):
66 """Redis-backed state store."""
67
68 def __init__(self, redis_url: str):
69 self.redis_url = redis_url
70 self._pool = None
71
72 async def _get_pool(self):
73 if self._pool is None:
74 import redis.asyncio as redis
75 self._pool = redis.from_url(self.redis_url)
76 return self._pool
77
78 async def get(self, key: str) -> Optional[StateEntry]:
79 pool = await self._get_pool()
80 data = await pool.get(f"state:{key}")
81
82 if data is None:
83 return None
84
85 entry_data = json.loads(data)
86 entry = StateEntry(
87 key=key,
88 value=entry_data["value"],
89 version=entry_data["version"],
90 created_at=datetime.fromisoformat(entry_data["created_at"]),
91 updated_at=datetime.fromisoformat(entry_data["updated_at"]),
92 expires_at=datetime.fromisoformat(entry_data["expires_at"])
93 if entry_data.get("expires_at") else None,
94 metadata=entry_data.get("metadata", {})
95 )
96
97 if entry.is_expired:
98 await self.delete(key)
99 return None
100
101 return entry
102
103 async def set(
104 self,
105 key: str,
106 value: Any,
107 ttl_seconds: Optional[int] = None
108 ) -> StateEntry:
109 pool = await self._get_pool()
110 now = datetime.utcnow()
111
112 # Get current version
113 existing = await self.get(key)
114 version = (existing.version + 1) if existing else 1
115
116 expires_at = None
117 if ttl_seconds:
118 expires_at = now + timedelta(seconds=ttl_seconds)
119
120 entry = StateEntry(
121 key=key,
122 value=value,
123 version=version,
124 created_at=existing.created_at if existing else now,
125 updated_at=now,
126 expires_at=expires_at
127 )
128
129 entry_data = {
130 "value": entry.value,
131 "version": entry.version,
132 "created_at": entry.created_at.isoformat(),
133 "updated_at": entry.updated_at.isoformat(),
134 "expires_at": entry.expires_at.isoformat() if entry.expires_at else None,
135 "metadata": entry.metadata
136 }
137
138 if ttl_seconds:
139 await pool.setex(
140 f"state:{key}",
141 ttl_seconds,
142 json.dumps(entry_data)
143 )
144 else:
145 await pool.set(f"state:{key}", json.dumps(entry_data))
146
147 return entry
148
149 async def delete(self, key: str) -> bool:
150 pool = await self._get_pool()
151 result = await pool.delete(f"state:{key}")
152 return result > 0
153
154 async def compare_and_set(
155 self,
156 key: str,
157 expected_version: int,
158 new_value: Any
159 ) -> bool:
160 pool = await self._get_pool()
161
162 # Use Lua script for atomic operation
163 script = """
164 local data = redis.call('GET', KEYS[1])
165 if not data then
166 return 0
167 end
168 local entry = cjson.decode(data)
169 if entry.version ~= tonumber(ARGV[1]) then
170 return 0
171 end
172 entry.value = cjson.decode(ARGV[2])
173 entry.version = entry.version + 1
174 entry.updated_at = ARGV[3]
175 redis.call('SET', KEYS[1], cjson.encode(entry))
176 return 1
177 """
178
179 result = await pool.eval(
180 script,
181 1,
182 f"state:{key}",
183 str(expected_version),
184 json.dumps(new_value),
185 datetime.utcnow().isoformat()
186 )
187
188 return result == 1
189
190
191class SessionStateManager:
192 """Manages session state for agents."""
193
194 def __init__(self, store: StateStore, session_ttl: int = 3600):
195 self.store = store
196 self.session_ttl = session_ttl
197
198 def _session_key(self, session_id: str) -> str:
199 return f"session:{session_id}"
200
201 async def get_session(
202 self,
203 session_id: str
204 ) -> Optional[dict[str, Any]]:
205 """Get session data."""
206 entry = await self.store.get(self._session_key(session_id))
207 return entry.value if entry else None
208
209 async def update_session(
210 self,
211 session_id: str,
212 data: dict[str, Any]
213 ) -> None:
214 """Update session data."""
215 await self.store.set(
216 self._session_key(session_id),
217 data,
218 ttl_seconds=self.session_ttl
219 )
220
221 async def add_message(
222 self,
223 session_id: str,
224 role: str,
225 content: str
226 ) -> None:
227 """Add a message to the session history."""
228 session = await self.get_session(session_id) or {
229 "messages": [],
230 "created_at": datetime.utcnow().isoformat()
231 }
232
233 session["messages"].append({
234 "role": role,
235 "content": content,
236 "timestamp": datetime.utcnow().isoformat()
237 })
238
239 # Limit history size
240 if len(session["messages"]) > 100:
241 session["messages"] = session["messages"][-100:]
242
243 await self.update_session(session_id, session)
244
245 async def clear_session(self, session_id: str) -> None:
246 """Clear a session."""
247 await self.store.delete(self._session_key(session_id))Distributed state management enables horizontal scaling while maintaining consistency. Redis provides fast access for session data, while the compare-and-set operation prevents race conditions in concurrent updates.
Communication Patterns
Agent systems use various communication patterns depending on latency requirements, reliability needs, and system complexity. Choosing the right pattern affects both performance and operational overhead.
Synchronous and Asynchronous Patterns
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import Any, Optional, Callable, Awaitable
4from enum import Enum
5import asyncio
6import json
7
8
9class CommunicationPattern(Enum):
10 """Communication patterns for agent systems."""
11 REQUEST_RESPONSE = "request_response"
12 ASYNC_MESSAGE = "async_message"
13 EVENT_DRIVEN = "event_driven"
14 STREAMING = "streaming"
15
16
17@dataclass
18class Message:
19 """Message for inter-service communication."""
20 id: str
21 type: str
22 payload: dict[str, Any]
23 correlation_id: Optional[str] = None
24 reply_to: Optional[str] = None
25 timestamp: Optional[str] = None
26
27
28class MessageBroker(ABC):
29 """Abstract message broker interface."""
30
31 @abstractmethod
32 async def publish(
33 self,
34 topic: str,
35 message: Message
36 ) -> None:
37 """Publish a message to a topic."""
38 pass
39
40 @abstractmethod
41 async def subscribe(
42 self,
43 topic: str,
44 handler: Callable[[Message], Awaitable[None]]
45 ) -> None:
46 """Subscribe to a topic."""
47 pass
48
49
50class InMemoryBroker(MessageBroker):
51 """In-memory message broker for development."""
52
53 def __init__(self):
54 self._handlers: dict[str, list[Callable]] = {}
55 self._queue: asyncio.Queue = asyncio.Queue()
56 self._running = False
57
58 async def publish(
59 self,
60 topic: str,
61 message: Message
62 ) -> None:
63 await self._queue.put((topic, message))
64
65 async def subscribe(
66 self,
67 topic: str,
68 handler: Callable[[Message], Awaitable[None]]
69 ) -> None:
70 if topic not in self._handlers:
71 self._handlers[topic] = []
72 self._handlers[topic].append(handler)
73
74 async def start(self) -> None:
75 """Start processing messages."""
76 self._running = True
77 while self._running:
78 try:
79 topic, message = await asyncio.wait_for(
80 self._queue.get(),
81 timeout=1.0
82 )
83 handlers = self._handlers.get(topic, [])
84 for handler in handlers:
85 asyncio.create_task(handler(message))
86 except asyncio.TimeoutError:
87 continue
88
89 async def stop(self) -> None:
90 self._running = False
91
92
93class RequestResponsePattern:
94 """Request-response communication pattern."""
95
96 def __init__(self, timeout: float = 30.0):
97 self.timeout = timeout
98 self._pending: dict[str, asyncio.Future] = {}
99 self._broker: Optional[MessageBroker] = None
100
101 async def setup(self, broker: MessageBroker) -> None:
102 """Setup the pattern with a broker."""
103 self._broker = broker
104 await broker.subscribe("responses", self._handle_response)
105
106 async def _handle_response(self, message: Message) -> None:
107 """Handle incoming response."""
108 correlation_id = message.correlation_id
109 if correlation_id and correlation_id in self._pending:
110 future = self._pending.pop(correlation_id)
111 future.set_result(message.payload)
112
113 async def request(
114 self,
115 topic: str,
116 payload: dict[str, Any]
117 ) -> dict[str, Any]:
118 """Send request and wait for response."""
119 import uuid
120
121 correlation_id = str(uuid.uuid4())
122 message = Message(
123 id=str(uuid.uuid4()),
124 type="request",
125 payload=payload,
126 correlation_id=correlation_id,
127 reply_to="responses"
128 )
129
130 future: asyncio.Future = asyncio.Future()
131 self._pending[correlation_id] = future
132
133 await self._broker.publish(topic, message)
134
135 try:
136 return await asyncio.wait_for(future, timeout=self.timeout)
137 except asyncio.TimeoutError:
138 self._pending.pop(correlation_id, None)
139 raise TimeoutError(f"Request {correlation_id} timed out")
140
141
142class EventDrivenPattern:
143 """Event-driven communication pattern."""
144
145 def __init__(self):
146 self._broker: Optional[MessageBroker] = None
147 self._event_handlers: dict[str, list[Callable]] = {}
148
149 async def setup(self, broker: MessageBroker) -> None:
150 self._broker = broker
151
152 def on_event(
153 self,
154 event_type: str
155 ) -> Callable:
156 """Decorator to register event handler."""
157 def decorator(func: Callable) -> Callable:
158 if event_type not in self._event_handlers:
159 self._event_handlers[event_type] = []
160 self._event_handlers[event_type].append(func)
161 return func
162 return decorator
163
164 async def emit(
165 self,
166 event_type: str,
167 data: dict[str, Any]
168 ) -> None:
169 """Emit an event."""
170 import uuid
171
172 message = Message(
173 id=str(uuid.uuid4()),
174 type=event_type,
175 payload=data
176 )
177
178 await self._broker.publish(f"events.{event_type}", message)
179
180 async def process_event(self, message: Message) -> None:
181 """Process an incoming event."""
182 handlers = self._event_handlers.get(message.type, [])
183 for handler in handlers:
184 await handler(message.payload)
185
186
187class StreamingPattern:
188 """Streaming communication pattern for real-time updates."""
189
190 def __init__(self):
191 self._streams: dict[str, list[asyncio.Queue]] = {}
192
193 def create_stream(self, stream_id: str) -> asyncio.Queue:
194 """Create a new stream."""
195 queue: asyncio.Queue = asyncio.Queue()
196 if stream_id not in self._streams:
197 self._streams[stream_id] = []
198 self._streams[stream_id].append(queue)
199 return queue
200
201 async def publish_to_stream(
202 self,
203 stream_id: str,
204 data: Any
205 ) -> None:
206 """Publish data to a stream."""
207 queues = self._streams.get(stream_id, [])
208 for queue in queues:
209 await queue.put(data)
210
211 async def close_stream(self, stream_id: str) -> None:
212 """Close a stream."""
213 queues = self._streams.pop(stream_id, [])
214 for queue in queues:
215 await queue.put(None) # Signal end of stream
216
217 async def stream_generator(
218 self,
219 stream_id: str
220 ):
221 """Async generator for consuming stream."""
222 queue = self.create_stream(stream_id)
223
224 try:
225 while True:
226 data = await queue.get()
227 if data is None:
228 break
229 yield data
230 finally:
231 # Cleanup
232 if stream_id in self._streams:
233 self._streams[stream_id].remove(queue)Different patterns suit different use cases: request-response for simple synchronous calls, event-driven for decoupled processing, and streaming for real-time token delivery. Production systems often combine multiple patterns.
| Pattern | Use Case | Latency | Coupling |
|---|---|---|---|
| Request-Response | Simple queries, tool calls | Low | High |
| Async Message | Background processing | Variable | Low |
| Event-Driven | Notifications, updates | Variable | Very Low |
| Streaming | Token streaming, progress | Very Low | Medium |
Infrastructure Components
Production architectures require supporting infrastructure for load balancing, caching, rate limiting, and health monitoring. These components ensure reliable operation under varying conditions.
Essential Infrastructure
1from dataclasses import dataclass
2from typing import Any, Optional
3import asyncio
4import time
5from collections import defaultdict
6
7
8@dataclass
9class RateLimitConfig:
10 """Rate limit configuration."""
11 requests_per_second: float
12 burst_size: int
13
14
15class TokenBucketRateLimiter:
16 """Token bucket rate limiter."""
17
18 def __init__(self, config: RateLimitConfig):
19 self.rate = config.requests_per_second
20 self.burst = config.burst_size
21 self._buckets: dict[str, tuple[float, float]] = {} # tokens, last_update
22
23 async def allow(self, key: str) -> bool:
24 """Check if request is allowed."""
25 now = time.monotonic()
26
27 if key not in self._buckets:
28 self._buckets[key] = (self.burst - 1, now)
29 return True
30
31 tokens, last_update = self._buckets[key]
32
33 # Add tokens based on time passed
34 elapsed = now - last_update
35 tokens = min(self.burst, tokens + elapsed * self.rate)
36
37 if tokens >= 1:
38 self._buckets[key] = (tokens - 1, now)
39 return True
40
41 return False
42
43 async def wait_for_token(self, key: str) -> None:
44 """Wait until a token is available."""
45 while not await self.allow(key):
46 await asyncio.sleep(1 / self.rate)
47
48
49class CircuitBreaker:
50 """Circuit breaker for fault tolerance."""
51
52 def __init__(
53 self,
54 failure_threshold: int = 5,
55 recovery_timeout: float = 30.0,
56 half_open_requests: int = 3
57 ):
58 self.failure_threshold = failure_threshold
59 self.recovery_timeout = recovery_timeout
60 self.half_open_requests = half_open_requests
61
62 self._failures: dict[str, int] = defaultdict(int)
63 self._last_failure: dict[str, float] = {}
64 self._state: dict[str, str] = defaultdict(lambda: "closed")
65 self._half_open_count: dict[str, int] = defaultdict(int)
66
67 def is_open(self, key: str) -> bool:
68 """Check if circuit is open."""
69 state = self._state[key]
70
71 if state == "closed":
72 return False
73
74 if state == "open":
75 # Check if recovery timeout passed
76 last = self._last_failure.get(key, 0)
77 if time.monotonic() - last > self.recovery_timeout:
78 self._state[key] = "half_open"
79 self._half_open_count[key] = 0
80 return False
81 return True
82
83 # Half open - allow limited requests
84 if self._half_open_count[key] < self.half_open_requests:
85 return False
86 return True
87
88 def record_success(self, key: str) -> None:
89 """Record a successful call."""
90 if self._state[key] == "half_open":
91 self._half_open_count[key] += 1
92 if self._half_open_count[key] >= self.half_open_requests:
93 # Recovered
94 self._state[key] = "closed"
95 self._failures[key] = 0
96
97 def record_failure(self, key: str) -> None:
98 """Record a failed call."""
99 self._failures[key] += 1
100 self._last_failure[key] = time.monotonic()
101
102 if self._state[key] == "half_open":
103 # Back to open
104 self._state[key] = "open"
105 elif self._failures[key] >= self.failure_threshold:
106 self._state[key] = "open"
107
108
109class CacheLayer:
110 """Caching layer for agent responses."""
111
112 def __init__(
113 self,
114 max_size: int = 1000,
115 default_ttl: int = 300
116 ):
117 self.max_size = max_size
118 self.default_ttl = default_ttl
119 self._cache: dict[str, tuple[Any, float]] = {}
120
121 def _cache_key(self, request: dict[str, Any]) -> str:
122 """Generate cache key from request."""
123 import hashlib
124 import json
125
126 content = json.dumps(request, sort_keys=True)
127 return hashlib.sha256(content.encode()).hexdigest()
128
129 async def get(self, request: dict[str, Any]) -> Optional[Any]:
130 """Get cached response."""
131 key = self._cache_key(request)
132
133 if key not in self._cache:
134 return None
135
136 value, expires_at = self._cache[key]
137
138 if time.monotonic() > expires_at:
139 del self._cache[key]
140 return None
141
142 return value
143
144 async def set(
145 self,
146 request: dict[str, Any],
147 response: Any,
148 ttl: Optional[int] = None
149 ) -> None:
150 """Cache a response."""
151 # Evict if at capacity
152 if len(self._cache) >= self.max_size:
153 # Remove oldest entries
154 sorted_items = sorted(
155 self._cache.items(),
156 key=lambda x: x[1][1]
157 )
158 for key, _ in sorted_items[:len(sorted_items) // 4]:
159 del self._cache[key]
160
161 key = self._cache_key(request)
162 ttl = ttl or self.default_ttl
163 expires_at = time.monotonic() + ttl
164 self._cache[key] = (response, expires_at)
165
166
167class HealthChecker:
168 """Health checker for service components."""
169
170 def __init__(self):
171 self._checks: dict[str, callable] = {}
172
173 def register_check(
174 self,
175 name: str,
176 check: callable
177 ) -> None:
178 """Register a health check."""
179 self._checks[name] = check
180
181 async def check_all(self) -> dict[str, Any]:
182 """Run all health checks."""
183 results = {}
184
185 for name, check in self._checks.items():
186 try:
187 if asyncio.iscoroutinefunction(check):
188 result = await check()
189 else:
190 result = check()
191 results[name] = {"status": "healthy", "details": result}
192 except Exception as e:
193 results[name] = {"status": "unhealthy", "error": str(e)}
194
195 overall = "healthy" if all(
196 r["status"] == "healthy" for r in results.values()
197 ) else "unhealthy"
198
199 return {
200 "status": overall,
201 "checks": results,
202 "timestamp": time.time()
203 }These infrastructure components work together to provide reliable service: rate limiters prevent overload, circuit breakers contain failures, caches reduce latency, and health checks enable monitoring and automated recovery.
Summary
Production architecture for agent systems requires careful consideration of layered designs, service patterns, state management, and communication strategies. The patterns introduced in this section provide a foundation for building scalable, reliable agent deployments.
Key Takeaways
- Layered Architecture - Separate gateway, orchestration, execution, integration, and persistence layers for independent scaling
- Service Patterns - Choose between microservices for flexibility or modular monolith for operational simplicity
- State Management - Use distributed state stores with proper TTL and atomic operations for consistency
- Communication Patterns - Match patterns to requirements: request-response for synchronous, events for decoupled, streaming for real-time
- Infrastructure Components - Rate limiting, circuit breakers, caching, and health checks are essential for production
Next Steps: The next section covers scaling agent systems to handle increased load through horizontal scaling, load balancing, and resource optimization strategies.