Chapter 21
15 min read
Section 129 of 175

Production Architecture Patterns

Production Deployment

Introduction

Moving agentic AI systems from prototype to production requires careful architectural planning. Production systems must handle concurrent users, maintain reliability under load, and provide consistent performance. This section explores proven architecture patterns for deploying agent systems at scale.

Learning Objectives: By the end of this section, you will understand core architectural patterns for production agents, learn how to design services that scale horizontally, implement proper state management strategies, and choose appropriate communication patterns for your use case.

The transition from development to production often reveals challenges that weren't apparent during prototyping. Latency requirements become strict, resource utilization matters, and failure scenarios that seemed unlikely become inevitable. A well-designed architecture addresses these concerns upfront.


Architectural Foundations

Production agent architectures typically follow layered designs that separate concerns and enable independent scaling. Understanding these layers helps you make informed decisions about where to place functionality and how components interact.

Core Architecture Layers

🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, Optional, Generic, TypeVar
4from enum import Enum
5import asyncio
6from datetime import datetime
7import uuid
8
9
10T = TypeVar('T')
11R = TypeVar('R')
12
13
14class ArchitectureLayer(Enum):
15    """Layers in the agent architecture."""
16    GATEWAY = "gateway"          # API and request routing
17    ORCHESTRATION = "orchestration"  # Workflow coordination
18    EXECUTION = "execution"      # Agent logic execution
19    INTEGRATION = "integration"  # External service calls
20    PERSISTENCE = "persistence"  # State and data storage
21
22
23@dataclass
24class RequestContext:
25    """Context for a request flowing through the system."""
26    request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
27    user_id: Optional[str] = None
28    session_id: Optional[str] = None
29    tenant_id: Optional[str] = None
30    created_at: datetime = field(default_factory=datetime.utcnow)
31    metadata: dict[str, Any] = field(default_factory=dict)
32    trace_id: Optional[str] = None
33    span_id: Optional[str] = None
34
35    def child_context(self) -> "RequestContext":
36        """Create a child context for nested operations."""
37        return RequestContext(
38            request_id=self.request_id,
39            user_id=self.user_id,
40            session_id=self.session_id,
41            tenant_id=self.tenant_id,
42            trace_id=self.trace_id,
43            span_id=str(uuid.uuid4()),
44            metadata=dict(self.metadata)
45        )
46
47
48class LayerComponent(ABC, Generic[T, R]):
49    """Abstract component in an architecture layer."""
50
51    def __init__(self, layer: ArchitectureLayer, name: str):
52        self.layer = layer
53        self.name = name
54
55    @abstractmethod
56    async def process(
57        self,
58        request: T,
59        context: RequestContext
60    ) -> R:
61        """Process request and return result."""
62        pass
63
64
65@dataclass
66class SystemConfig:
67    """Production system configuration."""
68    # Service configuration
69    service_name: str = "agent-service"
70    environment: str = "production"
71    version: str = "1.0.0"
72
73    # Resource limits
74    max_concurrent_requests: int = 100
75    request_timeout_seconds: float = 30.0
76    max_retries: int = 3
77
78    # Feature flags
79    enable_tracing: bool = True
80    enable_metrics: bool = True
81    enable_circuit_breaker: bool = True
82
83    # Rate limiting
84    rate_limit_requests_per_second: float = 100.0
85    rate_limit_burst: int = 200
86
87
88class ProductionArchitecture:
89    """Production architecture orchestrator."""
90
91    def __init__(self, config: SystemConfig):
92        self.config = config
93        self.layers: dict[ArchitectureLayer, list[LayerComponent]] = {
94            layer: [] for layer in ArchitectureLayer
95        }
96        self._semaphore = asyncio.Semaphore(config.max_concurrent_requests)
97
98    def register_component(
99        self,
100        component: LayerComponent
101    ) -> None:
102        """Register a component in its layer."""
103        self.layers[component.layer].append(component)
104
105    async def handle_request(
106        self,
107        request: Any,
108        context: Optional[RequestContext] = None
109    ) -> Any:
110        """Handle a request through all layers."""
111        context = context or RequestContext()
112
113        async with self._semaphore:
114            try:
115                result = request
116
117                # Process through layers in order
118                for layer in ArchitectureLayer:
119                    for component in self.layers[layer]:
120                        result = await asyncio.wait_for(
121                            component.process(result, context),
122                            timeout=self.config.request_timeout_seconds
123                        )
124
125                return result
126
127            except asyncio.TimeoutError:
128                raise TimeoutError(
129                    f"Request {context.request_id} timed out"
130                )
131
132            except Exception as e:
133                # Log and re-raise with context
134                raise RuntimeError(
135                    f"Request {context.request_id} failed: {e}"
136                ) from e

The layered architecture separates gateway logic (authentication, routing), orchestration (workflow management), execution (agent logic), integration (external calls), and persistence (state storage). Each layer can scale independently based on its specific resource requirements.

LayerResponsibilityScaling Pattern
GatewayAuthentication, routing, rate limitingHorizontal with load balancer
OrchestrationWorkflow coordination, state machinesHorizontal with partitioning
ExecutionAgent logic, LLM callsHorizontal with queue-based distribution
IntegrationExternal API calls, toolsHorizontal with connection pooling
PersistenceState storage, cachingVertical + read replicas

Service Patterns

Several service patterns work well for agent systems. The choice depends on your scalability requirements, team structure, and operational capabilities.

Microservices Agent Architecture

🐍python
1from dataclasses import dataclass
2from typing import Any, Optional
3from enum import Enum
4import httpx
5import asyncio
6
7
8class ServiceType(Enum):
9    """Types of agent services."""
10    GATEWAY = "gateway"
11    ORCHESTRATOR = "orchestrator"
12    EXECUTOR = "executor"
13    TOOL_RUNNER = "tool_runner"
14    STATE_MANAGER = "state_manager"
15
16
17@dataclass
18class ServiceEndpoint:
19    """Service endpoint configuration."""
20    service_type: ServiceType
21    host: str
22    port: int
23    health_path: str = "/health"
24    timeout_seconds: float = 30.0
25
26    @property
27    def base_url(self) -> str:
28        return f"http://{self.host}:{self.port}"
29
30
31class ServiceRegistry:
32    """Registry for service discovery."""
33
34    def __init__(self):
35        self._services: dict[ServiceType, list[ServiceEndpoint]] = {}
36        self._health_status: dict[str, bool] = {}
37
38    def register(self, endpoint: ServiceEndpoint) -> None:
39        """Register a service endpoint."""
40        if endpoint.service_type not in self._services:
41            self._services[endpoint.service_type] = []
42        self._services[endpoint.service_type].append(endpoint)
43        self._health_status[endpoint.base_url] = True
44
45    def get_endpoint(
46        self,
47        service_type: ServiceType
48    ) -> Optional[ServiceEndpoint]:
49        """Get a healthy endpoint for a service type."""
50        endpoints = self._services.get(service_type, [])
51        healthy = [
52            ep for ep in endpoints
53            if self._health_status.get(ep.base_url, False)
54        ]
55
56        if not healthy:
57            return None
58
59        # Simple round-robin (in production use proper load balancing)
60        return healthy[hash(str(asyncio.current_task())) % len(healthy)]
61
62    async def health_check(self) -> dict[str, bool]:
63        """Check health of all registered services."""
64        async with httpx.AsyncClient() as client:
65            for service_type, endpoints in self._services.items():
66                for endpoint in endpoints:
67                    try:
68                        response = await client.get(
69                            f"{endpoint.base_url}{endpoint.health_path}",
70                            timeout=5.0
71                        )
72                        self._health_status[endpoint.base_url] = (
73                            response.status_code == 200
74                        )
75                    except Exception:
76                        self._health_status[endpoint.base_url] = False
77
78        return dict(self._health_status)
79
80
81class ServiceClient:
82    """Client for calling other services."""
83
84    def __init__(
85        self,
86        registry: ServiceRegistry,
87        timeout: float = 30.0
88    ):
89        self.registry = registry
90        self.timeout = timeout
91        self._client = httpx.AsyncClient(timeout=timeout)
92
93    async def call(
94        self,
95        service_type: ServiceType,
96        path: str,
97        method: str = "POST",
98        data: Optional[dict[str, Any]] = None,
99        context: Optional[RequestContext] = None
100    ) -> dict[str, Any]:
101        """Call a service endpoint."""
102        endpoint = self.registry.get_endpoint(service_type)
103
104        if not endpoint:
105            raise RuntimeError(f"No healthy {service_type.value} service")
106
107        headers = {}
108        if context:
109            headers["X-Request-ID"] = context.request_id
110            if context.trace_id:
111                headers["X-Trace-ID"] = context.trace_id
112            if context.span_id:
113                headers["X-Span-ID"] = context.span_id
114
115        url = f"{endpoint.base_url}{path}"
116
117        response = await self._client.request(
118            method=method,
119            url=url,
120            json=data,
121            headers=headers
122        )
123        response.raise_for_status()
124
125        return response.json()
126
127    async def close(self) -> None:
128        """Close the HTTP client."""
129        await self._client.aclose()
130
131
132# Service implementations
133class GatewayService:
134    """API Gateway service."""
135
136    def __init__(
137        self,
138        service_client: ServiceClient,
139        rate_limiter: "RateLimiter"
140    ):
141        self.client = service_client
142        self.rate_limiter = rate_limiter
143
144    async def handle_request(
145        self,
146        request: dict[str, Any],
147        context: RequestContext
148    ) -> dict[str, Any]:
149        """Handle incoming request."""
150        # Check rate limit
151        if not await self.rate_limiter.allow(context.user_id or "anonymous"):
152            raise RuntimeError("Rate limit exceeded")
153
154        # Forward to orchestrator
155        return await self.client.call(
156            ServiceType.ORCHESTRATOR,
157            "/process",
158            data=request,
159            context=context
160        )
161
162
163class OrchestratorService:
164    """Workflow orchestration service."""
165
166    def __init__(self, service_client: ServiceClient):
167        self.client = service_client
168
169    async def process(
170        self,
171        request: dict[str, Any],
172        context: RequestContext
173    ) -> dict[str, Any]:
174        """Process a workflow request."""
175        workflow_type = request.get("workflow_type", "simple")
176
177        if workflow_type == "simple":
178            return await self._simple_workflow(request, context)
179        elif workflow_type == "multi_step":
180            return await self._multi_step_workflow(request, context)
181        else:
182            raise ValueError(f"Unknown workflow: {workflow_type}")
183
184    async def _simple_workflow(
185        self,
186        request: dict[str, Any],
187        context: RequestContext
188    ) -> dict[str, Any]:
189        """Execute simple single-step workflow."""
190        return await self.client.call(
191            ServiceType.EXECUTOR,
192            "/execute",
193            data={"task": request.get("task")},
194            context=context
195        )
196
197    async def _multi_step_workflow(
198        self,
199        request: dict[str, Any],
200        context: RequestContext
201    ) -> dict[str, Any]:
202        """Execute multi-step workflow."""
203        steps = request.get("steps", [])
204        results = []
205
206        for step in steps:
207            # Get current state
208            state = await self.client.call(
209                ServiceType.STATE_MANAGER,
210                "/state/get",
211                data={"session_id": context.session_id},
212                context=context
213            )
214
215            # Execute step
216            result = await self.client.call(
217                ServiceType.EXECUTOR,
218                "/execute",
219                data={"task": step, "state": state},
220                context=context
221            )
222
223            # Update state
224            await self.client.call(
225                ServiceType.STATE_MANAGER,
226                "/state/update",
227                data={
228                    "session_id": context.session_id,
229                    "update": result
230                },
231                context=context
232            )
233
234            results.append(result)
235
236        return {"steps_completed": len(results), "results": results}

The microservices pattern separates concerns into independently deployable services. This enables teams to work on different components simultaneously and scale each service based on its specific load characteristics.

Modular Monolith Pattern

🐍python
1from dataclasses import dataclass
2from typing import Any, Protocol, runtime_checkable
3import asyncio
4
5
6@runtime_checkable
7class Module(Protocol):
8    """Protocol for architecture modules."""
9
10    @property
11    def name(self) -> str:
12        ...
13
14    async def initialize(self) -> None:
15        ...
16
17    async def shutdown(self) -> None:
18        ...
19
20
21@dataclass
22class ModuleConfig:
23    """Configuration for a module."""
24    enabled: bool = True
25    max_concurrent: int = 10
26    timeout_seconds: float = 30.0
27
28
29class ModularMonolith:
30    """Modular monolith architecture."""
31
32    def __init__(self):
33        self._modules: dict[str, Module] = {}
34        self._configs: dict[str, ModuleConfig] = {}
35        self._initialized = False
36
37    def register_module(
38        self,
39        module: Module,
40        config: Optional[ModuleConfig] = None
41    ) -> None:
42        """Register a module."""
43        self._modules[module.name] = module
44        self._configs[module.name] = config or ModuleConfig()
45
46    async def initialize(self) -> None:
47        """Initialize all modules."""
48        for name, module in self._modules.items():
49            config = self._configs[name]
50            if config.enabled:
51                await module.initialize()
52        self._initialized = True
53
54    async def shutdown(self) -> None:
55        """Shutdown all modules."""
56        for module in self._modules.values():
57            await module.shutdown()
58        self._initialized = False
59
60    def get_module(self, name: str) -> Optional[Module]:
61        """Get a module by name."""
62        return self._modules.get(name)
63
64
65class GatewayModule:
66    """Gateway module for request handling."""
67
68    name = "gateway"
69
70    def __init__(self, orchestrator: "OrchestratorModule"):
71        self.orchestrator = orchestrator
72        self._rate_limiters: dict[str, float] = {}
73
74    async def initialize(self) -> None:
75        pass
76
77    async def shutdown(self) -> None:
78        pass
79
80    async def handle(
81        self,
82        request: dict[str, Any],
83        context: RequestContext
84    ) -> dict[str, Any]:
85        """Handle incoming request."""
86        # Rate limiting check
87        user = context.user_id or "anonymous"
88        last_request = self._rate_limiters.get(user, 0)
89        now = asyncio.get_event_loop().time()
90
91        if now - last_request < 0.1:  # 10 requests per second
92            raise RuntimeError("Rate limit exceeded")
93
94        self._rate_limiters[user] = now
95
96        return await self.orchestrator.process(request, context)
97
98
99class OrchestratorModule:
100    """Orchestrator module for workflow management."""
101
102    name = "orchestrator"
103
104    def __init__(self, executor: "ExecutorModule"):
105        self.executor = executor
106
107    async def initialize(self) -> None:
108        pass
109
110    async def shutdown(self) -> None:
111        pass
112
113    async def process(
114        self,
115        request: dict[str, Any],
116        context: RequestContext
117    ) -> dict[str, Any]:
118        """Process workflow request."""
119        task = request.get("task", "")
120        return await self.executor.execute(task, context)
121
122
123class ExecutorModule:
124    """Executor module for agent logic."""
125
126    name = "executor"
127
128    def __init__(self, llm_client: Any):
129        self.llm_client = llm_client
130
131    async def initialize(self) -> None:
132        pass
133
134    async def shutdown(self) -> None:
135        pass
136
137    async def execute(
138        self,
139        task: str,
140        context: RequestContext
141    ) -> dict[str, Any]:
142        """Execute agent task."""
143        # Call LLM
144        response = await self.llm_client.complete(task)
145
146        return {
147            "task": task,
148            "response": response,
149            "request_id": context.request_id
150        }
151
152
153# Assembly
154def create_modular_monolith(llm_client: Any) -> ModularMonolith:
155    """Create and wire up the modular monolith."""
156    app = ModularMonolith()
157
158    # Create modules with dependencies
159    executor = ExecutorModule(llm_client)
160    orchestrator = OrchestratorModule(executor)
161    gateway = GatewayModule(orchestrator)
162
163    # Register all modules
164    app.register_module(executor)
165    app.register_module(orchestrator)
166    app.register_module(gateway)
167
168    return app

The modular monolith provides the organizational benefits of microservices while maintaining the operational simplicity of a single deployment. Clear module boundaries enable future extraction if needed.


State Management

Production agents require robust state management for conversation history, user preferences, and workflow progress. The choice of state management strategy affects scalability, consistency, and failure recovery.

Distributed State Management

🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, Optional, TypeVar, Generic
4from datetime import datetime, timedelta
5import json
6import asyncio
7import hashlib
8
9
10T = TypeVar('T')
11
12
13@dataclass
14class StateEntry(Generic[T]):
15    """Entry in the state store."""
16    key: str
17    value: T
18    version: int
19    created_at: datetime
20    updated_at: datetime
21    expires_at: Optional[datetime] = None
22    metadata: dict[str, Any] = field(default_factory=dict)
23
24    @property
25    def is_expired(self) -> bool:
26        if self.expires_at is None:
27            return False
28        return datetime.utcnow() > self.expires_at
29
30
31class StateStore(ABC):
32    """Abstract state store interface."""
33
34    @abstractmethod
35    async def get(self, key: str) -> Optional[StateEntry]:
36        """Get state entry by key."""
37        pass
38
39    @abstractmethod
40    async def set(
41        self,
42        key: str,
43        value: Any,
44        ttl_seconds: Optional[int] = None
45    ) -> StateEntry:
46        """Set state entry."""
47        pass
48
49    @abstractmethod
50    async def delete(self, key: str) -> bool:
51        """Delete state entry."""
52        pass
53
54    @abstractmethod
55    async def compare_and_set(
56        self,
57        key: str,
58        expected_version: int,
59        new_value: Any
60    ) -> bool:
61        """Atomic compare-and-set operation."""
62        pass
63
64
65class RedisStateStore(StateStore):
66    """Redis-backed state store."""
67
68    def __init__(self, redis_url: str):
69        self.redis_url = redis_url
70        self._pool = None
71
72    async def _get_pool(self):
73        if self._pool is None:
74            import redis.asyncio as redis
75            self._pool = redis.from_url(self.redis_url)
76        return self._pool
77
78    async def get(self, key: str) -> Optional[StateEntry]:
79        pool = await self._get_pool()
80        data = await pool.get(f"state:{key}")
81
82        if data is None:
83            return None
84
85        entry_data = json.loads(data)
86        entry = StateEntry(
87            key=key,
88            value=entry_data["value"],
89            version=entry_data["version"],
90            created_at=datetime.fromisoformat(entry_data["created_at"]),
91            updated_at=datetime.fromisoformat(entry_data["updated_at"]),
92            expires_at=datetime.fromisoformat(entry_data["expires_at"])
93            if entry_data.get("expires_at") else None,
94            metadata=entry_data.get("metadata", {})
95        )
96
97        if entry.is_expired:
98            await self.delete(key)
99            return None
100
101        return entry
102
103    async def set(
104        self,
105        key: str,
106        value: Any,
107        ttl_seconds: Optional[int] = None
108    ) -> StateEntry:
109        pool = await self._get_pool()
110        now = datetime.utcnow()
111
112        # Get current version
113        existing = await self.get(key)
114        version = (existing.version + 1) if existing else 1
115
116        expires_at = None
117        if ttl_seconds:
118            expires_at = now + timedelta(seconds=ttl_seconds)
119
120        entry = StateEntry(
121            key=key,
122            value=value,
123            version=version,
124            created_at=existing.created_at if existing else now,
125            updated_at=now,
126            expires_at=expires_at
127        )
128
129        entry_data = {
130            "value": entry.value,
131            "version": entry.version,
132            "created_at": entry.created_at.isoformat(),
133            "updated_at": entry.updated_at.isoformat(),
134            "expires_at": entry.expires_at.isoformat() if entry.expires_at else None,
135            "metadata": entry.metadata
136        }
137
138        if ttl_seconds:
139            await pool.setex(
140                f"state:{key}",
141                ttl_seconds,
142                json.dumps(entry_data)
143            )
144        else:
145            await pool.set(f"state:{key}", json.dumps(entry_data))
146
147        return entry
148
149    async def delete(self, key: str) -> bool:
150        pool = await self._get_pool()
151        result = await pool.delete(f"state:{key}")
152        return result > 0
153
154    async def compare_and_set(
155        self,
156        key: str,
157        expected_version: int,
158        new_value: Any
159    ) -> bool:
160        pool = await self._get_pool()
161
162        # Use Lua script for atomic operation
163        script = """
164        local data = redis.call('GET', KEYS[1])
165        if not data then
166            return 0
167        end
168        local entry = cjson.decode(data)
169        if entry.version ~= tonumber(ARGV[1]) then
170            return 0
171        end
172        entry.value = cjson.decode(ARGV[2])
173        entry.version = entry.version + 1
174        entry.updated_at = ARGV[3]
175        redis.call('SET', KEYS[1], cjson.encode(entry))
176        return 1
177        """
178
179        result = await pool.eval(
180            script,
181            1,
182            f"state:{key}",
183            str(expected_version),
184            json.dumps(new_value),
185            datetime.utcnow().isoformat()
186        )
187
188        return result == 1
189
190
191class SessionStateManager:
192    """Manages session state for agents."""
193
194    def __init__(self, store: StateStore, session_ttl: int = 3600):
195        self.store = store
196        self.session_ttl = session_ttl
197
198    def _session_key(self, session_id: str) -> str:
199        return f"session:{session_id}"
200
201    async def get_session(
202        self,
203        session_id: str
204    ) -> Optional[dict[str, Any]]:
205        """Get session data."""
206        entry = await self.store.get(self._session_key(session_id))
207        return entry.value if entry else None
208
209    async def update_session(
210        self,
211        session_id: str,
212        data: dict[str, Any]
213    ) -> None:
214        """Update session data."""
215        await self.store.set(
216            self._session_key(session_id),
217            data,
218            ttl_seconds=self.session_ttl
219        )
220
221    async def add_message(
222        self,
223        session_id: str,
224        role: str,
225        content: str
226    ) -> None:
227        """Add a message to the session history."""
228        session = await self.get_session(session_id) or {
229            "messages": [],
230            "created_at": datetime.utcnow().isoformat()
231        }
232
233        session["messages"].append({
234            "role": role,
235            "content": content,
236            "timestamp": datetime.utcnow().isoformat()
237        })
238
239        # Limit history size
240        if len(session["messages"]) > 100:
241            session["messages"] = session["messages"][-100:]
242
243        await self.update_session(session_id, session)
244
245    async def clear_session(self, session_id: str) -> None:
246        """Clear a session."""
247        await self.store.delete(self._session_key(session_id))

Distributed state management enables horizontal scaling while maintaining consistency. Redis provides fast access for session data, while the compare-and-set operation prevents race conditions in concurrent updates.


Communication Patterns

Agent systems use various communication patterns depending on latency requirements, reliability needs, and system complexity. Choosing the right pattern affects both performance and operational overhead.

Synchronous and Asynchronous Patterns

🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import Any, Optional, Callable, Awaitable
4from enum import Enum
5import asyncio
6import json
7
8
9class CommunicationPattern(Enum):
10    """Communication patterns for agent systems."""
11    REQUEST_RESPONSE = "request_response"
12    ASYNC_MESSAGE = "async_message"
13    EVENT_DRIVEN = "event_driven"
14    STREAMING = "streaming"
15
16
17@dataclass
18class Message:
19    """Message for inter-service communication."""
20    id: str
21    type: str
22    payload: dict[str, Any]
23    correlation_id: Optional[str] = None
24    reply_to: Optional[str] = None
25    timestamp: Optional[str] = None
26
27
28class MessageBroker(ABC):
29    """Abstract message broker interface."""
30
31    @abstractmethod
32    async def publish(
33        self,
34        topic: str,
35        message: Message
36    ) -> None:
37        """Publish a message to a topic."""
38        pass
39
40    @abstractmethod
41    async def subscribe(
42        self,
43        topic: str,
44        handler: Callable[[Message], Awaitable[None]]
45    ) -> None:
46        """Subscribe to a topic."""
47        pass
48
49
50class InMemoryBroker(MessageBroker):
51    """In-memory message broker for development."""
52
53    def __init__(self):
54        self._handlers: dict[str, list[Callable]] = {}
55        self._queue: asyncio.Queue = asyncio.Queue()
56        self._running = False
57
58    async def publish(
59        self,
60        topic: str,
61        message: Message
62    ) -> None:
63        await self._queue.put((topic, message))
64
65    async def subscribe(
66        self,
67        topic: str,
68        handler: Callable[[Message], Awaitable[None]]
69    ) -> None:
70        if topic not in self._handlers:
71            self._handlers[topic] = []
72        self._handlers[topic].append(handler)
73
74    async def start(self) -> None:
75        """Start processing messages."""
76        self._running = True
77        while self._running:
78            try:
79                topic, message = await asyncio.wait_for(
80                    self._queue.get(),
81                    timeout=1.0
82                )
83                handlers = self._handlers.get(topic, [])
84                for handler in handlers:
85                    asyncio.create_task(handler(message))
86            except asyncio.TimeoutError:
87                continue
88
89    async def stop(self) -> None:
90        self._running = False
91
92
93class RequestResponsePattern:
94    """Request-response communication pattern."""
95
96    def __init__(self, timeout: float = 30.0):
97        self.timeout = timeout
98        self._pending: dict[str, asyncio.Future] = {}
99        self._broker: Optional[MessageBroker] = None
100
101    async def setup(self, broker: MessageBroker) -> None:
102        """Setup the pattern with a broker."""
103        self._broker = broker
104        await broker.subscribe("responses", self._handle_response)
105
106    async def _handle_response(self, message: Message) -> None:
107        """Handle incoming response."""
108        correlation_id = message.correlation_id
109        if correlation_id and correlation_id in self._pending:
110            future = self._pending.pop(correlation_id)
111            future.set_result(message.payload)
112
113    async def request(
114        self,
115        topic: str,
116        payload: dict[str, Any]
117    ) -> dict[str, Any]:
118        """Send request and wait for response."""
119        import uuid
120
121        correlation_id = str(uuid.uuid4())
122        message = Message(
123            id=str(uuid.uuid4()),
124            type="request",
125            payload=payload,
126            correlation_id=correlation_id,
127            reply_to="responses"
128        )
129
130        future: asyncio.Future = asyncio.Future()
131        self._pending[correlation_id] = future
132
133        await self._broker.publish(topic, message)
134
135        try:
136            return await asyncio.wait_for(future, timeout=self.timeout)
137        except asyncio.TimeoutError:
138            self._pending.pop(correlation_id, None)
139            raise TimeoutError(f"Request {correlation_id} timed out")
140
141
142class EventDrivenPattern:
143    """Event-driven communication pattern."""
144
145    def __init__(self):
146        self._broker: Optional[MessageBroker] = None
147        self._event_handlers: dict[str, list[Callable]] = {}
148
149    async def setup(self, broker: MessageBroker) -> None:
150        self._broker = broker
151
152    def on_event(
153        self,
154        event_type: str
155    ) -> Callable:
156        """Decorator to register event handler."""
157        def decorator(func: Callable) -> Callable:
158            if event_type not in self._event_handlers:
159                self._event_handlers[event_type] = []
160            self._event_handlers[event_type].append(func)
161            return func
162        return decorator
163
164    async def emit(
165        self,
166        event_type: str,
167        data: dict[str, Any]
168    ) -> None:
169        """Emit an event."""
170        import uuid
171
172        message = Message(
173            id=str(uuid.uuid4()),
174            type=event_type,
175            payload=data
176        )
177
178        await self._broker.publish(f"events.{event_type}", message)
179
180    async def process_event(self, message: Message) -> None:
181        """Process an incoming event."""
182        handlers = self._event_handlers.get(message.type, [])
183        for handler in handlers:
184            await handler(message.payload)
185
186
187class StreamingPattern:
188    """Streaming communication pattern for real-time updates."""
189
190    def __init__(self):
191        self._streams: dict[str, list[asyncio.Queue]] = {}
192
193    def create_stream(self, stream_id: str) -> asyncio.Queue:
194        """Create a new stream."""
195        queue: asyncio.Queue = asyncio.Queue()
196        if stream_id not in self._streams:
197            self._streams[stream_id] = []
198        self._streams[stream_id].append(queue)
199        return queue
200
201    async def publish_to_stream(
202        self,
203        stream_id: str,
204        data: Any
205    ) -> None:
206        """Publish data to a stream."""
207        queues = self._streams.get(stream_id, [])
208        for queue in queues:
209            await queue.put(data)
210
211    async def close_stream(self, stream_id: str) -> None:
212        """Close a stream."""
213        queues = self._streams.pop(stream_id, [])
214        for queue in queues:
215            await queue.put(None)  # Signal end of stream
216
217    async def stream_generator(
218        self,
219        stream_id: str
220    ):
221        """Async generator for consuming stream."""
222        queue = self.create_stream(stream_id)
223
224        try:
225            while True:
226                data = await queue.get()
227                if data is None:
228                    break
229                yield data
230        finally:
231            # Cleanup
232            if stream_id in self._streams:
233                self._streams[stream_id].remove(queue)

Different patterns suit different use cases: request-response for simple synchronous calls, event-driven for decoupled processing, and streaming for real-time token delivery. Production systems often combine multiple patterns.

PatternUse CaseLatencyCoupling
Request-ResponseSimple queries, tool callsLowHigh
Async MessageBackground processingVariableLow
Event-DrivenNotifications, updatesVariableVery Low
StreamingToken streaming, progressVery LowMedium

Infrastructure Components

Production architectures require supporting infrastructure for load balancing, caching, rate limiting, and health monitoring. These components ensure reliable operation under varying conditions.

Essential Infrastructure

🐍python
1from dataclasses import dataclass
2from typing import Any, Optional
3import asyncio
4import time
5from collections import defaultdict
6
7
8@dataclass
9class RateLimitConfig:
10    """Rate limit configuration."""
11    requests_per_second: float
12    burst_size: int
13
14
15class TokenBucketRateLimiter:
16    """Token bucket rate limiter."""
17
18    def __init__(self, config: RateLimitConfig):
19        self.rate = config.requests_per_second
20        self.burst = config.burst_size
21        self._buckets: dict[str, tuple[float, float]] = {}  # tokens, last_update
22
23    async def allow(self, key: str) -> bool:
24        """Check if request is allowed."""
25        now = time.monotonic()
26
27        if key not in self._buckets:
28            self._buckets[key] = (self.burst - 1, now)
29            return True
30
31        tokens, last_update = self._buckets[key]
32
33        # Add tokens based on time passed
34        elapsed = now - last_update
35        tokens = min(self.burst, tokens + elapsed * self.rate)
36
37        if tokens >= 1:
38            self._buckets[key] = (tokens - 1, now)
39            return True
40
41        return False
42
43    async def wait_for_token(self, key: str) -> None:
44        """Wait until a token is available."""
45        while not await self.allow(key):
46            await asyncio.sleep(1 / self.rate)
47
48
49class CircuitBreaker:
50    """Circuit breaker for fault tolerance."""
51
52    def __init__(
53        self,
54        failure_threshold: int = 5,
55        recovery_timeout: float = 30.0,
56        half_open_requests: int = 3
57    ):
58        self.failure_threshold = failure_threshold
59        self.recovery_timeout = recovery_timeout
60        self.half_open_requests = half_open_requests
61
62        self._failures: dict[str, int] = defaultdict(int)
63        self._last_failure: dict[str, float] = {}
64        self._state: dict[str, str] = defaultdict(lambda: "closed")
65        self._half_open_count: dict[str, int] = defaultdict(int)
66
67    def is_open(self, key: str) -> bool:
68        """Check if circuit is open."""
69        state = self._state[key]
70
71        if state == "closed":
72            return False
73
74        if state == "open":
75            # Check if recovery timeout passed
76            last = self._last_failure.get(key, 0)
77            if time.monotonic() - last > self.recovery_timeout:
78                self._state[key] = "half_open"
79                self._half_open_count[key] = 0
80                return False
81            return True
82
83        # Half open - allow limited requests
84        if self._half_open_count[key] < self.half_open_requests:
85            return False
86        return True
87
88    def record_success(self, key: str) -> None:
89        """Record a successful call."""
90        if self._state[key] == "half_open":
91            self._half_open_count[key] += 1
92            if self._half_open_count[key] >= self.half_open_requests:
93                # Recovered
94                self._state[key] = "closed"
95                self._failures[key] = 0
96
97    def record_failure(self, key: str) -> None:
98        """Record a failed call."""
99        self._failures[key] += 1
100        self._last_failure[key] = time.monotonic()
101
102        if self._state[key] == "half_open":
103            # Back to open
104            self._state[key] = "open"
105        elif self._failures[key] >= self.failure_threshold:
106            self._state[key] = "open"
107
108
109class CacheLayer:
110    """Caching layer for agent responses."""
111
112    def __init__(
113        self,
114        max_size: int = 1000,
115        default_ttl: int = 300
116    ):
117        self.max_size = max_size
118        self.default_ttl = default_ttl
119        self._cache: dict[str, tuple[Any, float]] = {}
120
121    def _cache_key(self, request: dict[str, Any]) -> str:
122        """Generate cache key from request."""
123        import hashlib
124        import json
125
126        content = json.dumps(request, sort_keys=True)
127        return hashlib.sha256(content.encode()).hexdigest()
128
129    async def get(self, request: dict[str, Any]) -> Optional[Any]:
130        """Get cached response."""
131        key = self._cache_key(request)
132
133        if key not in self._cache:
134            return None
135
136        value, expires_at = self._cache[key]
137
138        if time.monotonic() > expires_at:
139            del self._cache[key]
140            return None
141
142        return value
143
144    async def set(
145        self,
146        request: dict[str, Any],
147        response: Any,
148        ttl: Optional[int] = None
149    ) -> None:
150        """Cache a response."""
151        # Evict if at capacity
152        if len(self._cache) >= self.max_size:
153            # Remove oldest entries
154            sorted_items = sorted(
155                self._cache.items(),
156                key=lambda x: x[1][1]
157            )
158            for key, _ in sorted_items[:len(sorted_items) // 4]:
159                del self._cache[key]
160
161        key = self._cache_key(request)
162        ttl = ttl or self.default_ttl
163        expires_at = time.monotonic() + ttl
164        self._cache[key] = (response, expires_at)
165
166
167class HealthChecker:
168    """Health checker for service components."""
169
170    def __init__(self):
171        self._checks: dict[str, callable] = {}
172
173    def register_check(
174        self,
175        name: str,
176        check: callable
177    ) -> None:
178        """Register a health check."""
179        self._checks[name] = check
180
181    async def check_all(self) -> dict[str, Any]:
182        """Run all health checks."""
183        results = {}
184
185        for name, check in self._checks.items():
186            try:
187                if asyncio.iscoroutinefunction(check):
188                    result = await check()
189                else:
190                    result = check()
191                results[name] = {"status": "healthy", "details": result}
192            except Exception as e:
193                results[name] = {"status": "unhealthy", "error": str(e)}
194
195        overall = "healthy" if all(
196            r["status"] == "healthy" for r in results.values()
197        ) else "unhealthy"
198
199        return {
200            "status": overall,
201            "checks": results,
202            "timestamp": time.time()
203        }

These infrastructure components work together to provide reliable service: rate limiters prevent overload, circuit breakers contain failures, caches reduce latency, and health checks enable monitoring and automated recovery.


Summary

Production architecture for agent systems requires careful consideration of layered designs, service patterns, state management, and communication strategies. The patterns introduced in this section provide a foundation for building scalable, reliable agent deployments.

Key Takeaways

  • Layered Architecture - Separate gateway, orchestration, execution, integration, and persistence layers for independent scaling
  • Service Patterns - Choose between microservices for flexibility or modular monolith for operational simplicity
  • State Management - Use distributed state stores with proper TTL and atomic operations for consistency
  • Communication Patterns - Match patterns to requirements: request-response for synchronous, events for decoupled, streaming for real-time
  • Infrastructure Components - Rate limiting, circuit breakers, caching, and health checks are essential for production
Next Steps: The next section covers scaling agent systems to handle increased load through horizontal scaling, load balancing, and resource optimization strategies.