Introduction
Scaling agent systems presents unique challenges compared to traditional web services. LLM calls are computationally expensive, responses are variable in length and latency, and long-running agent workflows maintain state across multiple interactions. This section covers strategies for scaling agent systems effectively while maintaining performance and reliability.
Learning Objectives: By the end of this section, you will understand horizontal scaling patterns for agent workloads, implement intelligent load balancing, configure auto-scaling based on agent-specific metrics, and optimize resource utilization across your infrastructure.
Agent systems often exhibit bursty traffic patterns with significant variation in request complexity. A simple query might complete in seconds, while a complex multi-step workflow could run for minutes. Effective scaling strategies must account for this variability.
Horizontal Scaling
Horizontal scaling adds capacity by running more instances of your agent services. The key challenge is ensuring work is distributed evenly and state is managed correctly across instances.
Stateless Service Design
1from dataclasses import dataclass
2from typing import Any, Optional
3import asyncio
4import os
5
6
7@dataclass
8class InstanceInfo:
9 """Information about a service instance."""
10 instance_id: str
11 host: str
12 port: int
13 started_at: float
14 capacity: int
15 current_load: int
16
17 @property
18 def load_factor(self) -> float:
19 return self.current_load / self.capacity if self.capacity > 0 else 1.0
20
21 @property
22 def available_capacity(self) -> int:
23 return max(0, self.capacity - self.current_load)
24
25
26class StatelessAgentService:
27 """Stateless agent service designed for horizontal scaling."""
28
29 def __init__(
30 self,
31 instance_id: Optional[str] = None,
32 max_concurrent: int = 10
33 ):
34 self.instance_id = instance_id or os.getenv(
35 "INSTANCE_ID",
36 f"instance-{os.getpid()}"
37 )
38 self.max_concurrent = max_concurrent
39 self._semaphore = asyncio.Semaphore(max_concurrent)
40 self._active_requests = 0
41 self._state_store: Optional["StateStore"] = None
42
43 def set_state_store(self, store: "StateStore") -> None:
44 """Set external state store for session management."""
45 self._state_store = store
46
47 @property
48 def instance_info(self) -> InstanceInfo:
49 import time
50 return InstanceInfo(
51 instance_id=self.instance_id,
52 host=os.getenv("HOST", "localhost"),
53 port=int(os.getenv("PORT", "8000")),
54 started_at=time.time(),
55 capacity=self.max_concurrent,
56 current_load=self._active_requests
57 )
58
59 async def process_request(
60 self,
61 request: dict[str, Any],
62 session_id: Optional[str] = None
63 ) -> dict[str, Any]:
64 """Process a request in a stateless manner."""
65 async with self._semaphore:
66 self._active_requests += 1
67 try:
68 # Load state from external store if needed
69 state = None
70 if session_id and self._state_store:
71 state = await self._state_store.get(session_id)
72
73 # Process the request
74 result = await self._execute(request, state)
75
76 # Save updated state
77 if session_id and self._state_store and result.get("state"):
78 await self._state_store.set(session_id, result["state"])
79
80 return result
81
82 finally:
83 self._active_requests -= 1
84
85 async def _execute(
86 self,
87 request: dict[str, Any],
88 state: Optional[dict[str, Any]]
89 ) -> dict[str, Any]:
90 """Execute the agent logic."""
91 # Placeholder for actual agent logic
92 task = request.get("task", "")
93
94 # Include state in processing if available
95 context = state.get("context", []) if state else []
96
97 return {
98 "response": f"Processed: {task}",
99 "instance_id": self.instance_id,
100 "state": {
101 "context": context + [task],
102 "last_processed": task
103 }
104 }
105
106
107class InstanceRegistry:
108 """Registry for tracking service instances."""
109
110 def __init__(self):
111 self._instances: dict[str, InstanceInfo] = {}
112 self._lock = asyncio.Lock()
113
114 async def register(self, info: InstanceInfo) -> None:
115 """Register an instance."""
116 async with self._lock:
117 self._instances[info.instance_id] = info
118
119 async def deregister(self, instance_id: str) -> None:
120 """Deregister an instance."""
121 async with self._lock:
122 self._instances.pop(instance_id, None)
123
124 async def update_load(
125 self,
126 instance_id: str,
127 current_load: int
128 ) -> None:
129 """Update instance load."""
130 async with self._lock:
131 if instance_id in self._instances:
132 info = self._instances[instance_id]
133 self._instances[instance_id] = InstanceInfo(
134 instance_id=info.instance_id,
135 host=info.host,
136 port=info.port,
137 started_at=info.started_at,
138 capacity=info.capacity,
139 current_load=current_load
140 )
141
142 async def get_instances(self) -> list[InstanceInfo]:
143 """Get all registered instances."""
144 async with self._lock:
145 return list(self._instances.values())
146
147 async def get_available_instance(self) -> Optional[InstanceInfo]:
148 """Get instance with available capacity."""
149 instances = await self.get_instances()
150 available = [i for i in instances if i.available_capacity > 0]
151
152 if not available:
153 return None
154
155 # Return instance with most available capacity
156 return max(available, key=lambda i: i.available_capacity)Stateless services externalize all state to shared stores, enabling any instance to handle any request. The instance registry tracks available capacity across all instances for intelligent routing.
Partitioning Strategies
1from enum import Enum
2from typing import Any, Optional
3import hashlib
4
5
6class PartitionStrategy(Enum):
7 """Strategies for partitioning work across instances."""
8 ROUND_ROBIN = "round_robin"
9 HASH_BASED = "hash_based"
10 CAPACITY_BASED = "capacity_based"
11 AFFINITY = "affinity"
12
13
14class WorkPartitioner:
15 """Partitions work across service instances."""
16
17 def __init__(
18 self,
19 registry: InstanceRegistry,
20 strategy: PartitionStrategy = PartitionStrategy.CAPACITY_BASED
21 ):
22 self.registry = registry
23 self.strategy = strategy
24 self._round_robin_counter = 0
25 self._affinity_map: dict[str, str] = {}
26
27 async def get_instance(
28 self,
29 request: dict[str, Any],
30 session_id: Optional[str] = None
31 ) -> Optional[InstanceInfo]:
32 """Get the instance that should handle this request."""
33 instances = await self.registry.get_instances()
34 available = [i for i in instances if i.available_capacity > 0]
35
36 if not available:
37 return None
38
39 if self.strategy == PartitionStrategy.ROUND_ROBIN:
40 return self._round_robin(available)
41
42 elif self.strategy == PartitionStrategy.HASH_BASED:
43 return self._hash_based(available, request)
44
45 elif self.strategy == PartitionStrategy.CAPACITY_BASED:
46 return self._capacity_based(available)
47
48 elif self.strategy == PartitionStrategy.AFFINITY:
49 return self._affinity_based(available, session_id)
50
51 return available[0]
52
53 def _round_robin(
54 self,
55 instances: list[InstanceInfo]
56 ) -> InstanceInfo:
57 """Round-robin selection."""
58 idx = self._round_robin_counter % len(instances)
59 self._round_robin_counter += 1
60 return instances[idx]
61
62 def _hash_based(
63 self,
64 instances: list[InstanceInfo],
65 request: dict[str, Any]
66 ) -> InstanceInfo:
67 """Hash-based consistent selection."""
68 import json
69
70 # Create hash from request content
71 content = json.dumps(request, sort_keys=True)
72 hash_val = int(hashlib.sha256(content.encode()).hexdigest(), 16)
73 idx = hash_val % len(instances)
74 return instances[idx]
75
76 def _capacity_based(
77 self,
78 instances: list[InstanceInfo]
79 ) -> InstanceInfo:
80 """Select instance with most available capacity."""
81 return max(instances, key=lambda i: i.available_capacity)
82
83 def _affinity_based(
84 self,
85 instances: list[InstanceInfo],
86 session_id: Optional[str]
87 ) -> InstanceInfo:
88 """Session affinity - same session goes to same instance."""
89 if session_id and session_id in self._affinity_map:
90 target_id = self._affinity_map[session_id]
91 for instance in instances:
92 if instance.instance_id == target_id:
93 return instance
94
95 # No affinity or instance unavailable - use capacity-based
96 selected = self._capacity_based(instances)
97 if session_id:
98 self._affinity_map[session_id] = selected.instance_id
99 return selected
100
101 def clear_affinity(self, session_id: str) -> None:
102 """Clear session affinity."""
103 self._affinity_map.pop(session_id, None)
104
105
106class ShardedPartitioner:
107 """Partition work based on sharding key."""
108
109 def __init__(
110 self,
111 num_shards: int,
112 registry: InstanceRegistry
113 ):
114 self.num_shards = num_shards
115 self.registry = registry
116 # Map shards to instances
117 self._shard_assignment: dict[int, str] = {}
118
119 async def assign_shards(self) -> None:
120 """Assign shards to available instances."""
121 instances = await self.registry.get_instances()
122
123 if not instances:
124 return
125
126 # Distribute shards evenly
127 for shard_id in range(self.num_shards):
128 instance_idx = shard_id % len(instances)
129 self._shard_assignment[shard_id] = instances[instance_idx].instance_id
130
131 def get_shard(self, key: str) -> int:
132 """Get shard ID for a key."""
133 hash_val = int(hashlib.sha256(key.encode()).hexdigest(), 16)
134 return hash_val % self.num_shards
135
136 async def get_instance_for_key(
137 self,
138 key: str
139 ) -> Optional[InstanceInfo]:
140 """Get instance for a sharding key."""
141 shard_id = self.get_shard(key)
142 instance_id = self._shard_assignment.get(shard_id)
143
144 if not instance_id:
145 return None
146
147 instances = await self.registry.get_instances()
148 for instance in instances:
149 if instance.instance_id == instance_id:
150 return instance
151
152 # Instance not available - reassign shard
153 available = [i for i in instances if i.available_capacity > 0]
154 if available:
155 self._shard_assignment[shard_id] = available[0].instance_id
156 return available[0]
157
158 return NonePartitioning strategies distribute work across instances. Capacity-based partitioning ensures even load distribution, while affinity-based partitioning maintains session locality for performance optimization.
Load Balancing Strategies
Load balancing for agent systems requires awareness of the unique characteristics of LLM workloads: variable request duration, high memory usage, and the importance of request context.
Intelligent Load Balancer
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4import asyncio
5import time
6import random
7
8
9class LoadBalancingAlgorithm(Enum):
10 """Load balancing algorithms."""
11 ROUND_ROBIN = "round_robin"
12 LEAST_CONNECTIONS = "least_connections"
13 WEIGHTED_RESPONSE_TIME = "weighted_response_time"
14 ADAPTIVE = "adaptive"
15
16
17@dataclass
18class BackendMetrics:
19 """Metrics for a backend instance."""
20 instance_id: str
21 active_connections: int = 0
22 total_requests: int = 0
23 total_errors: int = 0
24 avg_response_time_ms: float = 0.0
25 last_response_times: list[float] = field(default_factory=list)
26 weight: float = 1.0
27 healthy: bool = True
28
29 def record_request(self, response_time_ms: float, success: bool) -> None:
30 """Record a completed request."""
31 self.total_requests += 1
32 if not success:
33 self.total_errors += 1
34
35 # Maintain sliding window of response times
36 self.last_response_times.append(response_time_ms)
37 if len(self.last_response_times) > 100:
38 self.last_response_times = self.last_response_times[-100:]
39
40 self.avg_response_time_ms = (
41 sum(self.last_response_times) / len(self.last_response_times)
42 )
43
44 @property
45 def error_rate(self) -> float:
46 if self.total_requests == 0:
47 return 0.0
48 return self.total_errors / self.total_requests
49
50 @property
51 def score(self) -> float:
52 """Calculate backend score for routing decisions."""
53 if not self.healthy:
54 return 0.0
55
56 # Lower is better
57 connection_penalty = self.active_connections * 10
58 latency_penalty = self.avg_response_time_ms / 100
59 error_penalty = self.error_rate * 1000
60
61 return max(0, self.weight * 100 - connection_penalty - latency_penalty - error_penalty)
62
63
64class AgentLoadBalancer:
65 """Load balancer optimized for agent workloads."""
66
67 def __init__(
68 self,
69 algorithm: LoadBalancingAlgorithm = LoadBalancingAlgorithm.ADAPTIVE
70 ):
71 self.algorithm = algorithm
72 self._backends: dict[str, BackendMetrics] = {}
73 self._round_robin_idx = 0
74 self._lock = asyncio.Lock()
75
76 def add_backend(
77 self,
78 instance_id: str,
79 weight: float = 1.0
80 ) -> None:
81 """Add a backend instance."""
82 self._backends[instance_id] = BackendMetrics(
83 instance_id=instance_id,
84 weight=weight
85 )
86
87 def remove_backend(self, instance_id: str) -> None:
88 """Remove a backend instance."""
89 self._backends.pop(instance_id, None)
90
91 def mark_unhealthy(self, instance_id: str) -> None:
92 """Mark a backend as unhealthy."""
93 if instance_id in self._backends:
94 self._backends[instance_id].healthy = False
95
96 def mark_healthy(self, instance_id: str) -> None:
97 """Mark a backend as healthy."""
98 if instance_id in self._backends:
99 self._backends[instance_id].healthy = True
100
101 async def get_backend(self) -> Optional[str]:
102 """Get the next backend to use."""
103 async with self._lock:
104 healthy = {
105 k: v for k, v in self._backends.items()
106 if v.healthy
107 }
108
109 if not healthy:
110 return None
111
112 if self.algorithm == LoadBalancingAlgorithm.ROUND_ROBIN:
113 return self._round_robin(list(healthy.keys()))
114
115 elif self.algorithm == LoadBalancingAlgorithm.LEAST_CONNECTIONS:
116 return self._least_connections(healthy)
117
118 elif self.algorithm == LoadBalancingAlgorithm.WEIGHTED_RESPONSE_TIME:
119 return self._weighted_response_time(healthy)
120
121 elif self.algorithm == LoadBalancingAlgorithm.ADAPTIVE:
122 return self._adaptive(healthy)
123
124 return list(healthy.keys())[0]
125
126 def _round_robin(self, backends: list[str]) -> str:
127 """Simple round-robin selection."""
128 idx = self._round_robin_idx % len(backends)
129 self._round_robin_idx += 1
130 return backends[idx]
131
132 def _least_connections(
133 self,
134 backends: dict[str, BackendMetrics]
135 ) -> str:
136 """Select backend with fewest active connections."""
137 return min(
138 backends.keys(),
139 key=lambda k: backends[k].active_connections
140 )
141
142 def _weighted_response_time(
143 self,
144 backends: dict[str, BackendMetrics]
145 ) -> str:
146 """Select based on weighted response time."""
147 # Calculate weights inversely proportional to response time
148 weights = {}
149 for instance_id, metrics in backends.items():
150 if metrics.avg_response_time_ms > 0:
151 weights[instance_id] = 1.0 / metrics.avg_response_time_ms
152 else:
153 weights[instance_id] = 1.0
154
155 total_weight = sum(weights.values())
156 rand_val = random.random() * total_weight
157
158 cumulative = 0
159 for instance_id, weight in weights.items():
160 cumulative += weight
161 if rand_val <= cumulative:
162 return instance_id
163
164 return list(backends.keys())[0]
165
166 def _adaptive(
167 self,
168 backends: dict[str, BackendMetrics]
169 ) -> str:
170 """Adaptive selection based on multiple factors."""
171 # Score each backend
172 scores = {
173 instance_id: metrics.score
174 for instance_id, metrics in backends.items()
175 }
176
177 # Select highest scoring
178 return max(scores.keys(), key=lambda k: scores[k])
179
180 async def record_request_start(self, instance_id: str) -> None:
181 """Record that a request has started."""
182 async with self._lock:
183 if instance_id in self._backends:
184 self._backends[instance_id].active_connections += 1
185
186 async def record_request_end(
187 self,
188 instance_id: str,
189 response_time_ms: float,
190 success: bool
191 ) -> None:
192 """Record that a request has completed."""
193 async with self._lock:
194 if instance_id in self._backends:
195 metrics = self._backends[instance_id]
196 metrics.active_connections = max(0, metrics.active_connections - 1)
197 metrics.record_request(response_time_ms, success)
198
199 def get_metrics(self) -> dict[str, Any]:
200 """Get load balancer metrics."""
201 return {
202 instance_id: {
203 "active_connections": m.active_connections,
204 "total_requests": m.total_requests,
205 "error_rate": m.error_rate,
206 "avg_response_time_ms": m.avg_response_time_ms,
207 "healthy": m.healthy,
208 "score": m.score
209 }
210 for instance_id, m in self._backends.items()
211 }The adaptive load balancer considers multiple factors: active connections, response time, error rate, and configured weights. This multi-dimensional approach handles the variable nature of agent workloads better than simple algorithms.
| Algorithm | Best For | Limitations |
|---|---|---|
| Round Robin | Uniform workloads | Ignores capacity differences |
| Least Connections | Variable request duration | Doesnt consider latency |
| Weighted Response Time | Latency-sensitive apps | Can overload fast backends |
| Adaptive | Agent workloads | More complex to tune |
Auto-Scaling
Auto-scaling automatically adjusts capacity based on demand. For agent systems, traditional CPU-based scaling often falls short because LLM calls are I/O-bound. Custom metrics provide better scaling signals.
Agent-Aware Auto-Scaler
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable, Awaitable
3from enum import Enum
4import asyncio
5import time
6
7
8class ScalingDirection(Enum):
9 """Scaling direction."""
10 UP = "up"
11 DOWN = "down"
12 NONE = "none"
13
14
15@dataclass
16class ScalingMetrics:
17 """Metrics for scaling decisions."""
18 queue_depth: int = 0
19 avg_response_time_ms: float = 0.0
20 active_requests: int = 0
21 total_capacity: int = 0
22 error_rate: float = 0.0
23 pending_llm_calls: int = 0
24
25 @property
26 def utilization(self) -> float:
27 if self.total_capacity == 0:
28 return 1.0
29 return self.active_requests / self.total_capacity
30
31 @property
32 def queue_pressure(self) -> float:
33 """Queue pressure indicator (0-1)."""
34 # Normalize queue depth
35 if self.total_capacity == 0:
36 return 1.0 if self.queue_depth > 0 else 0.0
37 return min(1.0, self.queue_depth / (self.total_capacity * 2))
38
39
40@dataclass
41class ScalingPolicy:
42 """Policy for auto-scaling decisions."""
43 # Scale up thresholds
44 scale_up_utilization: float = 0.8
45 scale_up_queue_pressure: float = 0.5
46 scale_up_latency_ms: float = 5000.0
47
48 # Scale down thresholds
49 scale_down_utilization: float = 0.3
50 scale_down_queue_pressure: float = 0.1
51
52 # Scaling limits
53 min_instances: int = 1
54 max_instances: int = 10
55
56 # Cooldown periods
57 scale_up_cooldown_seconds: float = 60.0
58 scale_down_cooldown_seconds: float = 300.0
59
60 # Scaling step size
61 scale_up_step: int = 2
62 scale_down_step: int = 1
63
64
65class AutoScaler:
66 """Auto-scaler for agent services."""
67
68 def __init__(
69 self,
70 policy: ScalingPolicy,
71 scale_up_callback: Callable[[int], Awaitable[None]],
72 scale_down_callback: Callable[[int], Awaitable[None]],
73 metrics_collector: Callable[[], Awaitable[ScalingMetrics]]
74 ):
75 self.policy = policy
76 self._scale_up = scale_up_callback
77 self._scale_down = scale_down_callback
78 self._get_metrics = metrics_collector
79
80 self._current_instances = policy.min_instances
81 self._last_scale_up = 0.0
82 self._last_scale_down = 0.0
83 self._running = False
84 self._scaling_history: list[dict[str, Any]] = []
85
86 async def start(self, interval_seconds: float = 30.0) -> None:
87 """Start the auto-scaler."""
88 self._running = True
89 while self._running:
90 try:
91 await self._evaluate_scaling()
92 except Exception as e:
93 print(f"Scaling evaluation error: {e}")
94
95 await asyncio.sleep(interval_seconds)
96
97 async def stop(self) -> None:
98 """Stop the auto-scaler."""
99 self._running = False
100
101 async def _evaluate_scaling(self) -> None:
102 """Evaluate and execute scaling decisions."""
103 metrics = await self._get_metrics()
104 direction = self._decide_scaling(metrics)
105
106 now = time.time()
107
108 if direction == ScalingDirection.UP:
109 if now - self._last_scale_up < self.policy.scale_up_cooldown_seconds:
110 return # In cooldown
111
112 new_count = min(
113 self._current_instances + self.policy.scale_up_step,
114 self.policy.max_instances
115 )
116
117 if new_count > self._current_instances:
118 await self._scale_up(new_count - self._current_instances)
119 self._record_scaling("up", self._current_instances, new_count, metrics)
120 self._current_instances = new_count
121 self._last_scale_up = now
122
123 elif direction == ScalingDirection.DOWN:
124 if now - self._last_scale_down < self.policy.scale_down_cooldown_seconds:
125 return # In cooldown
126
127 new_count = max(
128 self._current_instances - self.policy.scale_down_step,
129 self.policy.min_instances
130 )
131
132 if new_count < self._current_instances:
133 await self._scale_down(self._current_instances - new_count)
134 self._record_scaling("down", self._current_instances, new_count, metrics)
135 self._current_instances = new_count
136 self._last_scale_down = now
137
138 def _decide_scaling(self, metrics: ScalingMetrics) -> ScalingDirection:
139 """Decide scaling direction based on metrics."""
140 # Check for scale up conditions
141 if (
142 metrics.utilization > self.policy.scale_up_utilization or
143 metrics.queue_pressure > self.policy.scale_up_queue_pressure or
144 metrics.avg_response_time_ms > self.policy.scale_up_latency_ms
145 ):
146 if self._current_instances < self.policy.max_instances:
147 return ScalingDirection.UP
148
149 # Check for scale down conditions
150 if (
151 metrics.utilization < self.policy.scale_down_utilization and
152 metrics.queue_pressure < self.policy.scale_down_queue_pressure and
153 metrics.error_rate < 0.01
154 ):
155 if self._current_instances > self.policy.min_instances:
156 return ScalingDirection.DOWN
157
158 return ScalingDirection.NONE
159
160 def _record_scaling(
161 self,
162 direction: str,
163 old_count: int,
164 new_count: int,
165 metrics: ScalingMetrics
166 ) -> None:
167 """Record a scaling event."""
168 self._scaling_history.append({
169 "timestamp": time.time(),
170 "direction": direction,
171 "old_count": old_count,
172 "new_count": new_count,
173 "metrics": {
174 "utilization": metrics.utilization,
175 "queue_pressure": metrics.queue_pressure,
176 "avg_latency_ms": metrics.avg_response_time_ms
177 }
178 })
179
180 # Keep last 100 events
181 if len(self._scaling_history) > 100:
182 self._scaling_history = self._scaling_history[-100:]
183
184 def get_history(self) -> list[dict[str, Any]]:
185 """Get scaling history."""
186 return list(self._scaling_history)
187
188
189class PredictiveScaler:
190 """Predictive auto-scaler using historical patterns."""
191
192 def __init__(
193 self,
194 base_scaler: AutoScaler,
195 prediction_window_minutes: int = 15
196 ):
197 self.base_scaler = base_scaler
198 self.prediction_window = prediction_window_minutes
199 self._historical_load: list[tuple[float, float]] = [] # (timestamp, load)
200
201 def record_load(self, load: float) -> None:
202 """Record current load for prediction."""
203 self._historical_load.append((time.time(), load))
204
205 # Keep 24 hours of data
206 cutoff = time.time() - 86400
207 self._historical_load = [
208 (t, l) for t, l in self._historical_load if t > cutoff
209 ]
210
211 def predict_load(self) -> Optional[float]:
212 """Predict load for the next window."""
213 if len(self._historical_load) < 10:
214 return None
215
216 # Simple moving average prediction
217 # In production, use more sophisticated time series forecasting
218 now = time.time()
219 target_time = now + (self.prediction_window * 60)
220
221 # Find similar time periods from history
222 similar_loads = []
223 for timestamp, load in self._historical_load:
224 # Same time yesterday, last week, etc.
225 time_diff = abs((timestamp % 86400) - (target_time % 86400))
226 if time_diff < 3600: # Within 1 hour
227 similar_loads.append(load)
228
229 if similar_loads:
230 return sum(similar_loads) / len(similar_loads)
231
232 # Fallback to recent average
233 recent = self._historical_load[-10:]
234 return sum(l for _, l in recent) / len(recent)
235
236 async def proactive_scale(self) -> None:
237 """Proactively scale based on predictions."""
238 predicted = self.predict_load()
239
240 if predicted is None:
241 return
242
243 policy = self.base_scaler.policy
244 current = self.base_scaler._current_instances
245
246 # Calculate needed capacity
247 needed = int(predicted / (1 - policy.scale_up_utilization)) + 1
248 needed = max(policy.min_instances, min(policy.max_instances, needed))
249
250 if needed > current:
251 # Scale up proactively
252 await self.base_scaler._scale_up(needed - current)The auto-scaler uses multiple signals including queue depth, latency, and utilization to make scaling decisions. The predictive scaler extends this with historical patterns to scale proactively before load arrives.
Resource Optimization
Agent workloads can be optimized through request batching, connection pooling, and intelligent caching. These techniques reduce per-request overhead and improve overall throughput.
Request Batching and Pooling
1from dataclasses import dataclass
2from typing import Any, Optional, Callable, Awaitable
3import asyncio
4import time
5
6
7@dataclass
8class BatchConfig:
9 """Configuration for request batching."""
10 max_batch_size: int = 10
11 max_wait_ms: float = 100.0
12 enable_dynamic_sizing: bool = True
13
14
15class RequestBatcher:
16 """Batches requests for efficient processing."""
17
18 def __init__(
19 self,
20 config: BatchConfig,
21 batch_processor: Callable[[list[dict[str, Any]]], Awaitable[list[Any]]]
22 ):
23 self.config = config
24 self._process_batch = batch_processor
25 self._pending: list[tuple[dict[str, Any], asyncio.Future]] = []
26 self._lock = asyncio.Lock()
27 self._batch_task: Optional[asyncio.Task] = None
28
29 async def submit(self, request: dict[str, Any]) -> Any:
30 """Submit a request for batched processing."""
31 future: asyncio.Future = asyncio.Future()
32
33 async with self._lock:
34 self._pending.append((request, future))
35
36 # Start batch timer if this is the first request
37 if len(self._pending) == 1:
38 self._batch_task = asyncio.create_task(
39 self._batch_timeout()
40 )
41
42 # Process immediately if batch is full
43 if len(self._pending) >= self.config.max_batch_size:
44 await self._process_pending()
45
46 return await future
47
48 async def _batch_timeout(self) -> None:
49 """Wait for batch timeout then process."""
50 await asyncio.sleep(self.config.max_wait_ms / 1000)
51
52 async with self._lock:
53 if self._pending:
54 await self._process_pending()
55
56 async def _process_pending(self) -> None:
57 """Process all pending requests."""
58 if not self._pending:
59 return
60
61 batch = self._pending
62 self._pending = []
63
64 if self._batch_task:
65 self._batch_task.cancel()
66 self._batch_task = None
67
68 try:
69 requests = [req for req, _ in batch]
70 results = await self._process_batch(requests)
71
72 for (_, future), result in zip(batch, results):
73 future.set_result(result)
74
75 except Exception as e:
76 for _, future in batch:
77 future.set_exception(e)
78
79
80class ConnectionPool:
81 """Connection pool for external services."""
82
83 def __init__(
84 self,
85 create_connection: Callable[[], Awaitable[Any]],
86 max_size: int = 10,
87 min_size: int = 2,
88 max_idle_seconds: float = 300.0
89 ):
90 self._create = create_connection
91 self.max_size = max_size
92 self.min_size = min_size
93 self.max_idle = max_idle_seconds
94
95 self._available: asyncio.Queue = asyncio.Queue()
96 self._in_use: set = set()
97 self._created = 0
98 self._lock = asyncio.Lock()
99
100 async def initialize(self) -> None:
101 """Initialize minimum connections."""
102 for _ in range(self.min_size):
103 conn = await self._create()
104 await self._available.put((conn, time.time()))
105 self._created += 1
106
107 async def acquire(self) -> Any:
108 """Acquire a connection from the pool."""
109 async with self._lock:
110 # Try to get from available
111 while not self._available.empty():
112 conn, last_used = await self._available.get()
113
114 # Check if connection is stale
115 if time.time() - last_used > self.max_idle:
116 await self._close_connection(conn)
117 self._created -= 1
118 continue
119
120 self._in_use.add(id(conn))
121 return conn
122
123 # Create new connection if under limit
124 if self._created < self.max_size:
125 conn = await self._create()
126 self._created += 1
127 self._in_use.add(id(conn))
128 return conn
129
130 # Wait for available connection
131 conn, _ = await self._available.get()
132 async with self._lock:
133 self._in_use.add(id(conn))
134 return conn
135
136 async def release(self, conn: Any) -> None:
137 """Release a connection back to the pool."""
138 async with self._lock:
139 self._in_use.discard(id(conn))
140 await self._available.put((conn, time.time()))
141
142 async def _close_connection(self, conn: Any) -> None:
143 """Close a connection."""
144 if hasattr(conn, 'close'):
145 await conn.close()
146
147 @property
148 def stats(self) -> dict[str, int]:
149 """Get pool statistics."""
150 return {
151 "created": self._created,
152 "in_use": len(self._in_use),
153 "available": self._available.qsize()
154 }
155
156
157class ResourceOptimizer:
158 """Coordinates resource optimization strategies."""
159
160 def __init__(self):
161 self._batchers: dict[str, RequestBatcher] = {}
162 self._pools: dict[str, ConnectionPool] = {}
163 self._cache: dict[str, tuple[Any, float]] = {}
164 self._cache_ttl = 300.0
165
166 def register_batcher(
167 self,
168 name: str,
169 batcher: RequestBatcher
170 ) -> None:
171 """Register a request batcher."""
172 self._batchers[name] = batcher
173
174 def register_pool(
175 self,
176 name: str,
177 pool: ConnectionPool
178 ) -> None:
179 """Register a connection pool."""
180 self._pools[name] = pool
181
182 async def batch_request(
183 self,
184 batcher_name: str,
185 request: dict[str, Any]
186 ) -> Any:
187 """Submit request to a batcher."""
188 batcher = self._batchers.get(batcher_name)
189 if not batcher:
190 raise ValueError(f"Unknown batcher: {batcher_name}")
191 return await batcher.submit(request)
192
193 async def with_pooled_connection(
194 self,
195 pool_name: str,
196 operation: Callable[[Any], Awaitable[Any]]
197 ) -> Any:
198 """Execute operation with pooled connection."""
199 pool = self._pools.get(pool_name)
200 if not pool:
201 raise ValueError(f"Unknown pool: {pool_name}")
202
203 conn = await pool.acquire()
204 try:
205 return await operation(conn)
206 finally:
207 await pool.release(conn)
208
209 def cache_get(self, key: str) -> Optional[Any]:
210 """Get cached value."""
211 if key in self._cache:
212 value, expires_at = self._cache[key]
213 if time.time() < expires_at:
214 return value
215 del self._cache[key]
216 return None
217
218 def cache_set(
219 self,
220 key: str,
221 value: Any,
222 ttl: Optional[float] = None
223 ) -> None:
224 """Set cached value."""
225 ttl = ttl or self._cache_ttl
226 self._cache[key] = (value, time.time() + ttl)
227
228 def get_stats(self) -> dict[str, Any]:
229 """Get optimization statistics."""
230 return {
231 "pools": {
232 name: pool.stats
233 for name, pool in self._pools.items()
234 },
235 "cache_size": len(self._cache)
236 }Request batching reduces per-request overhead for LLM calls, while connection pooling efficiently manages external service connections. Together, these optimizations significantly improve throughput.
Bottleneck Identification
Identifying bottlenecks is essential for effective scaling. Agent systems have multiple potential bottlenecks: LLM API limits, memory for context, database connections, and compute for tool execution.
Performance Profiler
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from contextlib import asynccontextmanager
4import asyncio
5import time
6from collections import defaultdict
7
8
9@dataclass
10class ProfileData:
11 """Profiling data for an operation."""
12 name: str
13 duration_ms: float
14 start_time: float
15 end_time: float
16 metadata: dict[str, Any] = field(default_factory=dict)
17 children: list["ProfileData"] = field(default_factory=list)
18
19
20class OperationProfiler:
21 """Profiles operations to identify bottlenecks."""
22
23 def __init__(self):
24 self._profiles: list[ProfileData] = []
25 self._current_stack: list[ProfileData] = []
26 self._stats: dict[str, list[float]] = defaultdict(list)
27
28 @asynccontextmanager
29 async def profile(
30 self,
31 name: str,
32 metadata: Optional[dict[str, Any]] = None
33 ):
34 """Profile an async operation."""
35 start = time.perf_counter()
36
37 profile = ProfileData(
38 name=name,
39 duration_ms=0,
40 start_time=start,
41 end_time=0,
42 metadata=metadata or {}
43 )
44
45 # Track parent-child relationships
46 if self._current_stack:
47 self._current_stack[-1].children.append(profile)
48 else:
49 self._profiles.append(profile)
50
51 self._current_stack.append(profile)
52
53 try:
54 yield profile
55 finally:
56 end = time.perf_counter()
57 profile.end_time = end
58 profile.duration_ms = (end - start) * 1000
59 self._stats[name].append(profile.duration_ms)
60 self._current_stack.pop()
61
62 def get_bottlenecks(
63 self,
64 threshold_percentile: float = 0.9
65 ) -> list[dict[str, Any]]:
66 """Identify operations that are bottlenecks."""
67 bottlenecks = []
68
69 for name, durations in self._stats.items():
70 if not durations:
71 continue
72
73 sorted_durations = sorted(durations)
74 idx = int(len(sorted_durations) * threshold_percentile)
75 p90 = sorted_durations[min(idx, len(sorted_durations) - 1)]
76 avg = sum(durations) / len(durations)
77 max_duration = max(durations)
78
79 # Flag as bottleneck if P90 is significantly higher than average
80 if p90 > avg * 2:
81 bottlenecks.append({
82 "operation": name,
83 "count": len(durations),
84 "avg_ms": avg,
85 "p90_ms": p90,
86 "max_ms": max_duration,
87 "severity": "high" if p90 > 1000 else "medium"
88 })
89
90 return sorted(bottlenecks, key=lambda x: x["p90_ms"], reverse=True)
91
92 def get_summary(self) -> dict[str, Any]:
93 """Get profiling summary."""
94 summary = {}
95
96 for name, durations in self._stats.items():
97 if durations:
98 sorted_d = sorted(durations)
99 summary[name] = {
100 "count": len(durations),
101 "avg_ms": sum(durations) / len(durations),
102 "min_ms": min(durations),
103 "max_ms": max(durations),
104 "p50_ms": sorted_d[len(sorted_d) // 2],
105 "p95_ms": sorted_d[int(len(sorted_d) * 0.95)]
106 }
107
108 return summary
109
110 def clear(self) -> None:
111 """Clear profiling data."""
112 self._profiles.clear()
113 self._stats.clear()
114
115
116class BottleneckAnalyzer:
117 """Analyzes system bottlenecks across components."""
118
119 def __init__(self):
120 self._profiler = OperationProfiler()
121 self._resource_usage: dict[str, list[float]] = defaultdict(list)
122
123 @asynccontextmanager
124 async def trace(self, operation: str):
125 """Trace an operation."""
126 async with self._profiler.profile(operation):
127 yield
128
129 def record_resource_usage(
130 self,
131 resource: str,
132 usage: float
133 ) -> None:
134 """Record resource usage (0-1 scale)."""
135 self._resource_usage[resource].append(usage)
136 # Keep last 1000 readings
137 if len(self._resource_usage[resource]) > 1000:
138 self._resource_usage[resource] = self._resource_usage[resource][-1000:]
139
140 def analyze(self) -> dict[str, Any]:
141 """Analyze bottlenecks and provide recommendations."""
142 analysis = {
143 "operation_bottlenecks": self._profiler.get_bottlenecks(),
144 "resource_bottlenecks": [],
145 "recommendations": []
146 }
147
148 # Analyze resource usage
149 for resource, usage in self._resource_usage.items():
150 if not usage:
151 continue
152
153 avg_usage = sum(usage) / len(usage)
154 max_usage = max(usage)
155
156 if avg_usage > 0.8:
157 analysis["resource_bottlenecks"].append({
158 "resource": resource,
159 "avg_usage": avg_usage,
160 "max_usage": max_usage,
161 "severity": "high"
162 })
163 analysis["recommendations"].append(
164 f"Scale {resource} - consistently above 80% utilization"
165 )
166 elif max_usage > 0.95:
167 analysis["resource_bottlenecks"].append({
168 "resource": resource,
169 "avg_usage": avg_usage,
170 "max_usage": max_usage,
171 "severity": "medium"
172 })
173 analysis["recommendations"].append(
174 f"Monitor {resource} - occasional spikes above 95%"
175 )
176
177 # Generate operation recommendations
178 for bottleneck in analysis["operation_bottlenecks"][:3]:
179 op = bottleneck["operation"]
180 if "llm" in op.lower():
181 analysis["recommendations"].append(
182 f"Consider caching or batching for {op}"
183 )
184 elif "database" in op.lower() or "db" in op.lower():
185 analysis["recommendations"].append(
186 f"Add connection pooling or read replicas for {op}"
187 )
188 else:
189 analysis["recommendations"].append(
190 f"Profile and optimize {op}"
191 )
192
193 return analysisThe profiler tracks operation timing across the system, identifying which operations contribute most to latency. Combined with resource monitoring, this provides a complete picture of system bottlenecks.
Summary
Scaling agent systems requires a multi-faceted approach that addresses the unique characteristics of LLM workloads. Effective scaling combines horizontal scaling, intelligent load balancing, auto-scaling, resource optimization, and continuous bottleneck analysis.
Key Takeaways
- Stateless Design - Externalize state to enable any instance to handle any request
- Intelligent Load Balancing - Use adaptive algorithms that consider latency, connections, and error rates
- Custom Scaling Metrics - Queue depth and latency are better signals than CPU for agent workloads
- Request Batching - Batch LLM requests to reduce per-request overhead
- Proactive Scaling - Use historical patterns to scale before load arrives
Next Steps: The next section covers queue-based agent processing for handling long-running workflows and managing backpressure in production systems.