Chapter 21
18 min read
Section 130 of 175

Scaling Agent Systems

Production Deployment

Introduction

Scaling agent systems presents unique challenges compared to traditional web services. LLM calls are computationally expensive, responses are variable in length and latency, and long-running agent workflows maintain state across multiple interactions. This section covers strategies for scaling agent systems effectively while maintaining performance and reliability.

Learning Objectives: By the end of this section, you will understand horizontal scaling patterns for agent workloads, implement intelligent load balancing, configure auto-scaling based on agent-specific metrics, and optimize resource utilization across your infrastructure.

Agent systems often exhibit bursty traffic patterns with significant variation in request complexity. A simple query might complete in seconds, while a complex multi-step workflow could run for minutes. Effective scaling strategies must account for this variability.


Horizontal Scaling

Horizontal scaling adds capacity by running more instances of your agent services. The key challenge is ensuring work is distributed evenly and state is managed correctly across instances.

Stateless Service Design

🐍python
1from dataclasses import dataclass
2from typing import Any, Optional
3import asyncio
4import os
5
6
7@dataclass
8class InstanceInfo:
9    """Information about a service instance."""
10    instance_id: str
11    host: str
12    port: int
13    started_at: float
14    capacity: int
15    current_load: int
16
17    @property
18    def load_factor(self) -> float:
19        return self.current_load / self.capacity if self.capacity > 0 else 1.0
20
21    @property
22    def available_capacity(self) -> int:
23        return max(0, self.capacity - self.current_load)
24
25
26class StatelessAgentService:
27    """Stateless agent service designed for horizontal scaling."""
28
29    def __init__(
30        self,
31        instance_id: Optional[str] = None,
32        max_concurrent: int = 10
33    ):
34        self.instance_id = instance_id or os.getenv(
35            "INSTANCE_ID",
36            f"instance-{os.getpid()}"
37        )
38        self.max_concurrent = max_concurrent
39        self._semaphore = asyncio.Semaphore(max_concurrent)
40        self._active_requests = 0
41        self._state_store: Optional["StateStore"] = None
42
43    def set_state_store(self, store: "StateStore") -> None:
44        """Set external state store for session management."""
45        self._state_store = store
46
47    @property
48    def instance_info(self) -> InstanceInfo:
49        import time
50        return InstanceInfo(
51            instance_id=self.instance_id,
52            host=os.getenv("HOST", "localhost"),
53            port=int(os.getenv("PORT", "8000")),
54            started_at=time.time(),
55            capacity=self.max_concurrent,
56            current_load=self._active_requests
57        )
58
59    async def process_request(
60        self,
61        request: dict[str, Any],
62        session_id: Optional[str] = None
63    ) -> dict[str, Any]:
64        """Process a request in a stateless manner."""
65        async with self._semaphore:
66            self._active_requests += 1
67            try:
68                # Load state from external store if needed
69                state = None
70                if session_id and self._state_store:
71                    state = await self._state_store.get(session_id)
72
73                # Process the request
74                result = await self._execute(request, state)
75
76                # Save updated state
77                if session_id and self._state_store and result.get("state"):
78                    await self._state_store.set(session_id, result["state"])
79
80                return result
81
82            finally:
83                self._active_requests -= 1
84
85    async def _execute(
86        self,
87        request: dict[str, Any],
88        state: Optional[dict[str, Any]]
89    ) -> dict[str, Any]:
90        """Execute the agent logic."""
91        # Placeholder for actual agent logic
92        task = request.get("task", "")
93
94        # Include state in processing if available
95        context = state.get("context", []) if state else []
96
97        return {
98            "response": f"Processed: {task}",
99            "instance_id": self.instance_id,
100            "state": {
101                "context": context + [task],
102                "last_processed": task
103            }
104        }
105
106
107class InstanceRegistry:
108    """Registry for tracking service instances."""
109
110    def __init__(self):
111        self._instances: dict[str, InstanceInfo] = {}
112        self._lock = asyncio.Lock()
113
114    async def register(self, info: InstanceInfo) -> None:
115        """Register an instance."""
116        async with self._lock:
117            self._instances[info.instance_id] = info
118
119    async def deregister(self, instance_id: str) -> None:
120        """Deregister an instance."""
121        async with self._lock:
122            self._instances.pop(instance_id, None)
123
124    async def update_load(
125        self,
126        instance_id: str,
127        current_load: int
128    ) -> None:
129        """Update instance load."""
130        async with self._lock:
131            if instance_id in self._instances:
132                info = self._instances[instance_id]
133                self._instances[instance_id] = InstanceInfo(
134                    instance_id=info.instance_id,
135                    host=info.host,
136                    port=info.port,
137                    started_at=info.started_at,
138                    capacity=info.capacity,
139                    current_load=current_load
140                )
141
142    async def get_instances(self) -> list[InstanceInfo]:
143        """Get all registered instances."""
144        async with self._lock:
145            return list(self._instances.values())
146
147    async def get_available_instance(self) -> Optional[InstanceInfo]:
148        """Get instance with available capacity."""
149        instances = await self.get_instances()
150        available = [i for i in instances if i.available_capacity > 0]
151
152        if not available:
153            return None
154
155        # Return instance with most available capacity
156        return max(available, key=lambda i: i.available_capacity)

Stateless services externalize all state to shared stores, enabling any instance to handle any request. The instance registry tracks available capacity across all instances for intelligent routing.

Partitioning Strategies

🐍python
1from enum import Enum
2from typing import Any, Optional
3import hashlib
4
5
6class PartitionStrategy(Enum):
7    """Strategies for partitioning work across instances."""
8    ROUND_ROBIN = "round_robin"
9    HASH_BASED = "hash_based"
10    CAPACITY_BASED = "capacity_based"
11    AFFINITY = "affinity"
12
13
14class WorkPartitioner:
15    """Partitions work across service instances."""
16
17    def __init__(
18        self,
19        registry: InstanceRegistry,
20        strategy: PartitionStrategy = PartitionStrategy.CAPACITY_BASED
21    ):
22        self.registry = registry
23        self.strategy = strategy
24        self._round_robin_counter = 0
25        self._affinity_map: dict[str, str] = {}
26
27    async def get_instance(
28        self,
29        request: dict[str, Any],
30        session_id: Optional[str] = None
31    ) -> Optional[InstanceInfo]:
32        """Get the instance that should handle this request."""
33        instances = await self.registry.get_instances()
34        available = [i for i in instances if i.available_capacity > 0]
35
36        if not available:
37            return None
38
39        if self.strategy == PartitionStrategy.ROUND_ROBIN:
40            return self._round_robin(available)
41
42        elif self.strategy == PartitionStrategy.HASH_BASED:
43            return self._hash_based(available, request)
44
45        elif self.strategy == PartitionStrategy.CAPACITY_BASED:
46            return self._capacity_based(available)
47
48        elif self.strategy == PartitionStrategy.AFFINITY:
49            return self._affinity_based(available, session_id)
50
51        return available[0]
52
53    def _round_robin(
54        self,
55        instances: list[InstanceInfo]
56    ) -> InstanceInfo:
57        """Round-robin selection."""
58        idx = self._round_robin_counter % len(instances)
59        self._round_robin_counter += 1
60        return instances[idx]
61
62    def _hash_based(
63        self,
64        instances: list[InstanceInfo],
65        request: dict[str, Any]
66    ) -> InstanceInfo:
67        """Hash-based consistent selection."""
68        import json
69
70        # Create hash from request content
71        content = json.dumps(request, sort_keys=True)
72        hash_val = int(hashlib.sha256(content.encode()).hexdigest(), 16)
73        idx = hash_val % len(instances)
74        return instances[idx]
75
76    def _capacity_based(
77        self,
78        instances: list[InstanceInfo]
79    ) -> InstanceInfo:
80        """Select instance with most available capacity."""
81        return max(instances, key=lambda i: i.available_capacity)
82
83    def _affinity_based(
84        self,
85        instances: list[InstanceInfo],
86        session_id: Optional[str]
87    ) -> InstanceInfo:
88        """Session affinity - same session goes to same instance."""
89        if session_id and session_id in self._affinity_map:
90            target_id = self._affinity_map[session_id]
91            for instance in instances:
92                if instance.instance_id == target_id:
93                    return instance
94
95        # No affinity or instance unavailable - use capacity-based
96        selected = self._capacity_based(instances)
97        if session_id:
98            self._affinity_map[session_id] = selected.instance_id
99        return selected
100
101    def clear_affinity(self, session_id: str) -> None:
102        """Clear session affinity."""
103        self._affinity_map.pop(session_id, None)
104
105
106class ShardedPartitioner:
107    """Partition work based on sharding key."""
108
109    def __init__(
110        self,
111        num_shards: int,
112        registry: InstanceRegistry
113    ):
114        self.num_shards = num_shards
115        self.registry = registry
116        # Map shards to instances
117        self._shard_assignment: dict[int, str] = {}
118
119    async def assign_shards(self) -> None:
120        """Assign shards to available instances."""
121        instances = await self.registry.get_instances()
122
123        if not instances:
124            return
125
126        # Distribute shards evenly
127        for shard_id in range(self.num_shards):
128            instance_idx = shard_id % len(instances)
129            self._shard_assignment[shard_id] = instances[instance_idx].instance_id
130
131    def get_shard(self, key: str) -> int:
132        """Get shard ID for a key."""
133        hash_val = int(hashlib.sha256(key.encode()).hexdigest(), 16)
134        return hash_val % self.num_shards
135
136    async def get_instance_for_key(
137        self,
138        key: str
139    ) -> Optional[InstanceInfo]:
140        """Get instance for a sharding key."""
141        shard_id = self.get_shard(key)
142        instance_id = self._shard_assignment.get(shard_id)
143
144        if not instance_id:
145            return None
146
147        instances = await self.registry.get_instances()
148        for instance in instances:
149            if instance.instance_id == instance_id:
150                return instance
151
152        # Instance not available - reassign shard
153        available = [i for i in instances if i.available_capacity > 0]
154        if available:
155            self._shard_assignment[shard_id] = available[0].instance_id
156            return available[0]
157
158        return None

Partitioning strategies distribute work across instances. Capacity-based partitioning ensures even load distribution, while affinity-based partitioning maintains session locality for performance optimization.


Load Balancing Strategies

Load balancing for agent systems requires awareness of the unique characteristics of LLM workloads: variable request duration, high memory usage, and the importance of request context.

Intelligent Load Balancer

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4import asyncio
5import time
6import random
7
8
9class LoadBalancingAlgorithm(Enum):
10    """Load balancing algorithms."""
11    ROUND_ROBIN = "round_robin"
12    LEAST_CONNECTIONS = "least_connections"
13    WEIGHTED_RESPONSE_TIME = "weighted_response_time"
14    ADAPTIVE = "adaptive"
15
16
17@dataclass
18class BackendMetrics:
19    """Metrics for a backend instance."""
20    instance_id: str
21    active_connections: int = 0
22    total_requests: int = 0
23    total_errors: int = 0
24    avg_response_time_ms: float = 0.0
25    last_response_times: list[float] = field(default_factory=list)
26    weight: float = 1.0
27    healthy: bool = True
28
29    def record_request(self, response_time_ms: float, success: bool) -> None:
30        """Record a completed request."""
31        self.total_requests += 1
32        if not success:
33            self.total_errors += 1
34
35        # Maintain sliding window of response times
36        self.last_response_times.append(response_time_ms)
37        if len(self.last_response_times) > 100:
38            self.last_response_times = self.last_response_times[-100:]
39
40        self.avg_response_time_ms = (
41            sum(self.last_response_times) / len(self.last_response_times)
42        )
43
44    @property
45    def error_rate(self) -> float:
46        if self.total_requests == 0:
47            return 0.0
48        return self.total_errors / self.total_requests
49
50    @property
51    def score(self) -> float:
52        """Calculate backend score for routing decisions."""
53        if not self.healthy:
54            return 0.0
55
56        # Lower is better
57        connection_penalty = self.active_connections * 10
58        latency_penalty = self.avg_response_time_ms / 100
59        error_penalty = self.error_rate * 1000
60
61        return max(0, self.weight * 100 - connection_penalty - latency_penalty - error_penalty)
62
63
64class AgentLoadBalancer:
65    """Load balancer optimized for agent workloads."""
66
67    def __init__(
68        self,
69        algorithm: LoadBalancingAlgorithm = LoadBalancingAlgorithm.ADAPTIVE
70    ):
71        self.algorithm = algorithm
72        self._backends: dict[str, BackendMetrics] = {}
73        self._round_robin_idx = 0
74        self._lock = asyncio.Lock()
75
76    def add_backend(
77        self,
78        instance_id: str,
79        weight: float = 1.0
80    ) -> None:
81        """Add a backend instance."""
82        self._backends[instance_id] = BackendMetrics(
83            instance_id=instance_id,
84            weight=weight
85        )
86
87    def remove_backend(self, instance_id: str) -> None:
88        """Remove a backend instance."""
89        self._backends.pop(instance_id, None)
90
91    def mark_unhealthy(self, instance_id: str) -> None:
92        """Mark a backend as unhealthy."""
93        if instance_id in self._backends:
94            self._backends[instance_id].healthy = False
95
96    def mark_healthy(self, instance_id: str) -> None:
97        """Mark a backend as healthy."""
98        if instance_id in self._backends:
99            self._backends[instance_id].healthy = True
100
101    async def get_backend(self) -> Optional[str]:
102        """Get the next backend to use."""
103        async with self._lock:
104            healthy = {
105                k: v for k, v in self._backends.items()
106                if v.healthy
107            }
108
109            if not healthy:
110                return None
111
112            if self.algorithm == LoadBalancingAlgorithm.ROUND_ROBIN:
113                return self._round_robin(list(healthy.keys()))
114
115            elif self.algorithm == LoadBalancingAlgorithm.LEAST_CONNECTIONS:
116                return self._least_connections(healthy)
117
118            elif self.algorithm == LoadBalancingAlgorithm.WEIGHTED_RESPONSE_TIME:
119                return self._weighted_response_time(healthy)
120
121            elif self.algorithm == LoadBalancingAlgorithm.ADAPTIVE:
122                return self._adaptive(healthy)
123
124            return list(healthy.keys())[0]
125
126    def _round_robin(self, backends: list[str]) -> str:
127        """Simple round-robin selection."""
128        idx = self._round_robin_idx % len(backends)
129        self._round_robin_idx += 1
130        return backends[idx]
131
132    def _least_connections(
133        self,
134        backends: dict[str, BackendMetrics]
135    ) -> str:
136        """Select backend with fewest active connections."""
137        return min(
138            backends.keys(),
139            key=lambda k: backends[k].active_connections
140        )
141
142    def _weighted_response_time(
143        self,
144        backends: dict[str, BackendMetrics]
145    ) -> str:
146        """Select based on weighted response time."""
147        # Calculate weights inversely proportional to response time
148        weights = {}
149        for instance_id, metrics in backends.items():
150            if metrics.avg_response_time_ms > 0:
151                weights[instance_id] = 1.0 / metrics.avg_response_time_ms
152            else:
153                weights[instance_id] = 1.0
154
155        total_weight = sum(weights.values())
156        rand_val = random.random() * total_weight
157
158        cumulative = 0
159        for instance_id, weight in weights.items():
160            cumulative += weight
161            if rand_val <= cumulative:
162                return instance_id
163
164        return list(backends.keys())[0]
165
166    def _adaptive(
167        self,
168        backends: dict[str, BackendMetrics]
169    ) -> str:
170        """Adaptive selection based on multiple factors."""
171        # Score each backend
172        scores = {
173            instance_id: metrics.score
174            for instance_id, metrics in backends.items()
175        }
176
177        # Select highest scoring
178        return max(scores.keys(), key=lambda k: scores[k])
179
180    async def record_request_start(self, instance_id: str) -> None:
181        """Record that a request has started."""
182        async with self._lock:
183            if instance_id in self._backends:
184                self._backends[instance_id].active_connections += 1
185
186    async def record_request_end(
187        self,
188        instance_id: str,
189        response_time_ms: float,
190        success: bool
191    ) -> None:
192        """Record that a request has completed."""
193        async with self._lock:
194            if instance_id in self._backends:
195                metrics = self._backends[instance_id]
196                metrics.active_connections = max(0, metrics.active_connections - 1)
197                metrics.record_request(response_time_ms, success)
198
199    def get_metrics(self) -> dict[str, Any]:
200        """Get load balancer metrics."""
201        return {
202            instance_id: {
203                "active_connections": m.active_connections,
204                "total_requests": m.total_requests,
205                "error_rate": m.error_rate,
206                "avg_response_time_ms": m.avg_response_time_ms,
207                "healthy": m.healthy,
208                "score": m.score
209            }
210            for instance_id, m in self._backends.items()
211        }

The adaptive load balancer considers multiple factors: active connections, response time, error rate, and configured weights. This multi-dimensional approach handles the variable nature of agent workloads better than simple algorithms.

AlgorithmBest ForLimitations
Round RobinUniform workloadsIgnores capacity differences
Least ConnectionsVariable request durationDoesnt consider latency
Weighted Response TimeLatency-sensitive appsCan overload fast backends
AdaptiveAgent workloadsMore complex to tune

Auto-Scaling

Auto-scaling automatically adjusts capacity based on demand. For agent systems, traditional CPU-based scaling often falls short because LLM calls are I/O-bound. Custom metrics provide better scaling signals.

Agent-Aware Auto-Scaler

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable, Awaitable
3from enum import Enum
4import asyncio
5import time
6
7
8class ScalingDirection(Enum):
9    """Scaling direction."""
10    UP = "up"
11    DOWN = "down"
12    NONE = "none"
13
14
15@dataclass
16class ScalingMetrics:
17    """Metrics for scaling decisions."""
18    queue_depth: int = 0
19    avg_response_time_ms: float = 0.0
20    active_requests: int = 0
21    total_capacity: int = 0
22    error_rate: float = 0.0
23    pending_llm_calls: int = 0
24
25    @property
26    def utilization(self) -> float:
27        if self.total_capacity == 0:
28            return 1.0
29        return self.active_requests / self.total_capacity
30
31    @property
32    def queue_pressure(self) -> float:
33        """Queue pressure indicator (0-1)."""
34        # Normalize queue depth
35        if self.total_capacity == 0:
36            return 1.0 if self.queue_depth > 0 else 0.0
37        return min(1.0, self.queue_depth / (self.total_capacity * 2))
38
39
40@dataclass
41class ScalingPolicy:
42    """Policy for auto-scaling decisions."""
43    # Scale up thresholds
44    scale_up_utilization: float = 0.8
45    scale_up_queue_pressure: float = 0.5
46    scale_up_latency_ms: float = 5000.0
47
48    # Scale down thresholds
49    scale_down_utilization: float = 0.3
50    scale_down_queue_pressure: float = 0.1
51
52    # Scaling limits
53    min_instances: int = 1
54    max_instances: int = 10
55
56    # Cooldown periods
57    scale_up_cooldown_seconds: float = 60.0
58    scale_down_cooldown_seconds: float = 300.0
59
60    # Scaling step size
61    scale_up_step: int = 2
62    scale_down_step: int = 1
63
64
65class AutoScaler:
66    """Auto-scaler for agent services."""
67
68    def __init__(
69        self,
70        policy: ScalingPolicy,
71        scale_up_callback: Callable[[int], Awaitable[None]],
72        scale_down_callback: Callable[[int], Awaitable[None]],
73        metrics_collector: Callable[[], Awaitable[ScalingMetrics]]
74    ):
75        self.policy = policy
76        self._scale_up = scale_up_callback
77        self._scale_down = scale_down_callback
78        self._get_metrics = metrics_collector
79
80        self._current_instances = policy.min_instances
81        self._last_scale_up = 0.0
82        self._last_scale_down = 0.0
83        self._running = False
84        self._scaling_history: list[dict[str, Any]] = []
85
86    async def start(self, interval_seconds: float = 30.0) -> None:
87        """Start the auto-scaler."""
88        self._running = True
89        while self._running:
90            try:
91                await self._evaluate_scaling()
92            except Exception as e:
93                print(f"Scaling evaluation error: {e}")
94
95            await asyncio.sleep(interval_seconds)
96
97    async def stop(self) -> None:
98        """Stop the auto-scaler."""
99        self._running = False
100
101    async def _evaluate_scaling(self) -> None:
102        """Evaluate and execute scaling decisions."""
103        metrics = await self._get_metrics()
104        direction = self._decide_scaling(metrics)
105
106        now = time.time()
107
108        if direction == ScalingDirection.UP:
109            if now - self._last_scale_up < self.policy.scale_up_cooldown_seconds:
110                return  # In cooldown
111
112            new_count = min(
113                self._current_instances + self.policy.scale_up_step,
114                self.policy.max_instances
115            )
116
117            if new_count > self._current_instances:
118                await self._scale_up(new_count - self._current_instances)
119                self._record_scaling("up", self._current_instances, new_count, metrics)
120                self._current_instances = new_count
121                self._last_scale_up = now
122
123        elif direction == ScalingDirection.DOWN:
124            if now - self._last_scale_down < self.policy.scale_down_cooldown_seconds:
125                return  # In cooldown
126
127            new_count = max(
128                self._current_instances - self.policy.scale_down_step,
129                self.policy.min_instances
130            )
131
132            if new_count < self._current_instances:
133                await self._scale_down(self._current_instances - new_count)
134                self._record_scaling("down", self._current_instances, new_count, metrics)
135                self._current_instances = new_count
136                self._last_scale_down = now
137
138    def _decide_scaling(self, metrics: ScalingMetrics) -> ScalingDirection:
139        """Decide scaling direction based on metrics."""
140        # Check for scale up conditions
141        if (
142            metrics.utilization > self.policy.scale_up_utilization or
143            metrics.queue_pressure > self.policy.scale_up_queue_pressure or
144            metrics.avg_response_time_ms > self.policy.scale_up_latency_ms
145        ):
146            if self._current_instances < self.policy.max_instances:
147                return ScalingDirection.UP
148
149        # Check for scale down conditions
150        if (
151            metrics.utilization < self.policy.scale_down_utilization and
152            metrics.queue_pressure < self.policy.scale_down_queue_pressure and
153            metrics.error_rate < 0.01
154        ):
155            if self._current_instances > self.policy.min_instances:
156                return ScalingDirection.DOWN
157
158        return ScalingDirection.NONE
159
160    def _record_scaling(
161        self,
162        direction: str,
163        old_count: int,
164        new_count: int,
165        metrics: ScalingMetrics
166    ) -> None:
167        """Record a scaling event."""
168        self._scaling_history.append({
169            "timestamp": time.time(),
170            "direction": direction,
171            "old_count": old_count,
172            "new_count": new_count,
173            "metrics": {
174                "utilization": metrics.utilization,
175                "queue_pressure": metrics.queue_pressure,
176                "avg_latency_ms": metrics.avg_response_time_ms
177            }
178        })
179
180        # Keep last 100 events
181        if len(self._scaling_history) > 100:
182            self._scaling_history = self._scaling_history[-100:]
183
184    def get_history(self) -> list[dict[str, Any]]:
185        """Get scaling history."""
186        return list(self._scaling_history)
187
188
189class PredictiveScaler:
190    """Predictive auto-scaler using historical patterns."""
191
192    def __init__(
193        self,
194        base_scaler: AutoScaler,
195        prediction_window_minutes: int = 15
196    ):
197        self.base_scaler = base_scaler
198        self.prediction_window = prediction_window_minutes
199        self._historical_load: list[tuple[float, float]] = []  # (timestamp, load)
200
201    def record_load(self, load: float) -> None:
202        """Record current load for prediction."""
203        self._historical_load.append((time.time(), load))
204
205        # Keep 24 hours of data
206        cutoff = time.time() - 86400
207        self._historical_load = [
208            (t, l) for t, l in self._historical_load if t > cutoff
209        ]
210
211    def predict_load(self) -> Optional[float]:
212        """Predict load for the next window."""
213        if len(self._historical_load) < 10:
214            return None
215
216        # Simple moving average prediction
217        # In production, use more sophisticated time series forecasting
218        now = time.time()
219        target_time = now + (self.prediction_window * 60)
220
221        # Find similar time periods from history
222        similar_loads = []
223        for timestamp, load in self._historical_load:
224            # Same time yesterday, last week, etc.
225            time_diff = abs((timestamp % 86400) - (target_time % 86400))
226            if time_diff < 3600:  # Within 1 hour
227                similar_loads.append(load)
228
229        if similar_loads:
230            return sum(similar_loads) / len(similar_loads)
231
232        # Fallback to recent average
233        recent = self._historical_load[-10:]
234        return sum(l for _, l in recent) / len(recent)
235
236    async def proactive_scale(self) -> None:
237        """Proactively scale based on predictions."""
238        predicted = self.predict_load()
239
240        if predicted is None:
241            return
242
243        policy = self.base_scaler.policy
244        current = self.base_scaler._current_instances
245
246        # Calculate needed capacity
247        needed = int(predicted / (1 - policy.scale_up_utilization)) + 1
248        needed = max(policy.min_instances, min(policy.max_instances, needed))
249
250        if needed > current:
251            # Scale up proactively
252            await self.base_scaler._scale_up(needed - current)

The auto-scaler uses multiple signals including queue depth, latency, and utilization to make scaling decisions. The predictive scaler extends this with historical patterns to scale proactively before load arrives.


Resource Optimization

Agent workloads can be optimized through request batching, connection pooling, and intelligent caching. These techniques reduce per-request overhead and improve overall throughput.

Request Batching and Pooling

🐍python
1from dataclasses import dataclass
2from typing import Any, Optional, Callable, Awaitable
3import asyncio
4import time
5
6
7@dataclass
8class BatchConfig:
9    """Configuration for request batching."""
10    max_batch_size: int = 10
11    max_wait_ms: float = 100.0
12    enable_dynamic_sizing: bool = True
13
14
15class RequestBatcher:
16    """Batches requests for efficient processing."""
17
18    def __init__(
19        self,
20        config: BatchConfig,
21        batch_processor: Callable[[list[dict[str, Any]]], Awaitable[list[Any]]]
22    ):
23        self.config = config
24        self._process_batch = batch_processor
25        self._pending: list[tuple[dict[str, Any], asyncio.Future]] = []
26        self._lock = asyncio.Lock()
27        self._batch_task: Optional[asyncio.Task] = None
28
29    async def submit(self, request: dict[str, Any]) -> Any:
30        """Submit a request for batched processing."""
31        future: asyncio.Future = asyncio.Future()
32
33        async with self._lock:
34            self._pending.append((request, future))
35
36            # Start batch timer if this is the first request
37            if len(self._pending) == 1:
38                self._batch_task = asyncio.create_task(
39                    self._batch_timeout()
40                )
41
42            # Process immediately if batch is full
43            if len(self._pending) >= self.config.max_batch_size:
44                await self._process_pending()
45
46        return await future
47
48    async def _batch_timeout(self) -> None:
49        """Wait for batch timeout then process."""
50        await asyncio.sleep(self.config.max_wait_ms / 1000)
51
52        async with self._lock:
53            if self._pending:
54                await self._process_pending()
55
56    async def _process_pending(self) -> None:
57        """Process all pending requests."""
58        if not self._pending:
59            return
60
61        batch = self._pending
62        self._pending = []
63
64        if self._batch_task:
65            self._batch_task.cancel()
66            self._batch_task = None
67
68        try:
69            requests = [req for req, _ in batch]
70            results = await self._process_batch(requests)
71
72            for (_, future), result in zip(batch, results):
73                future.set_result(result)
74
75        except Exception as e:
76            for _, future in batch:
77                future.set_exception(e)
78
79
80class ConnectionPool:
81    """Connection pool for external services."""
82
83    def __init__(
84        self,
85        create_connection: Callable[[], Awaitable[Any]],
86        max_size: int = 10,
87        min_size: int = 2,
88        max_idle_seconds: float = 300.0
89    ):
90        self._create = create_connection
91        self.max_size = max_size
92        self.min_size = min_size
93        self.max_idle = max_idle_seconds
94
95        self._available: asyncio.Queue = asyncio.Queue()
96        self._in_use: set = set()
97        self._created = 0
98        self._lock = asyncio.Lock()
99
100    async def initialize(self) -> None:
101        """Initialize minimum connections."""
102        for _ in range(self.min_size):
103            conn = await self._create()
104            await self._available.put((conn, time.time()))
105            self._created += 1
106
107    async def acquire(self) -> Any:
108        """Acquire a connection from the pool."""
109        async with self._lock:
110            # Try to get from available
111            while not self._available.empty():
112                conn, last_used = await self._available.get()
113
114                # Check if connection is stale
115                if time.time() - last_used > self.max_idle:
116                    await self._close_connection(conn)
117                    self._created -= 1
118                    continue
119
120                self._in_use.add(id(conn))
121                return conn
122
123            # Create new connection if under limit
124            if self._created < self.max_size:
125                conn = await self._create()
126                self._created += 1
127                self._in_use.add(id(conn))
128                return conn
129
130        # Wait for available connection
131        conn, _ = await self._available.get()
132        async with self._lock:
133            self._in_use.add(id(conn))
134        return conn
135
136    async def release(self, conn: Any) -> None:
137        """Release a connection back to the pool."""
138        async with self._lock:
139            self._in_use.discard(id(conn))
140            await self._available.put((conn, time.time()))
141
142    async def _close_connection(self, conn: Any) -> None:
143        """Close a connection."""
144        if hasattr(conn, 'close'):
145            await conn.close()
146
147    @property
148    def stats(self) -> dict[str, int]:
149        """Get pool statistics."""
150        return {
151            "created": self._created,
152            "in_use": len(self._in_use),
153            "available": self._available.qsize()
154        }
155
156
157class ResourceOptimizer:
158    """Coordinates resource optimization strategies."""
159
160    def __init__(self):
161        self._batchers: dict[str, RequestBatcher] = {}
162        self._pools: dict[str, ConnectionPool] = {}
163        self._cache: dict[str, tuple[Any, float]] = {}
164        self._cache_ttl = 300.0
165
166    def register_batcher(
167        self,
168        name: str,
169        batcher: RequestBatcher
170    ) -> None:
171        """Register a request batcher."""
172        self._batchers[name] = batcher
173
174    def register_pool(
175        self,
176        name: str,
177        pool: ConnectionPool
178    ) -> None:
179        """Register a connection pool."""
180        self._pools[name] = pool
181
182    async def batch_request(
183        self,
184        batcher_name: str,
185        request: dict[str, Any]
186    ) -> Any:
187        """Submit request to a batcher."""
188        batcher = self._batchers.get(batcher_name)
189        if not batcher:
190            raise ValueError(f"Unknown batcher: {batcher_name}")
191        return await batcher.submit(request)
192
193    async def with_pooled_connection(
194        self,
195        pool_name: str,
196        operation: Callable[[Any], Awaitable[Any]]
197    ) -> Any:
198        """Execute operation with pooled connection."""
199        pool = self._pools.get(pool_name)
200        if not pool:
201            raise ValueError(f"Unknown pool: {pool_name}")
202
203        conn = await pool.acquire()
204        try:
205            return await operation(conn)
206        finally:
207            await pool.release(conn)
208
209    def cache_get(self, key: str) -> Optional[Any]:
210        """Get cached value."""
211        if key in self._cache:
212            value, expires_at = self._cache[key]
213            if time.time() < expires_at:
214                return value
215            del self._cache[key]
216        return None
217
218    def cache_set(
219        self,
220        key: str,
221        value: Any,
222        ttl: Optional[float] = None
223    ) -> None:
224        """Set cached value."""
225        ttl = ttl or self._cache_ttl
226        self._cache[key] = (value, time.time() + ttl)
227
228    def get_stats(self) -> dict[str, Any]:
229        """Get optimization statistics."""
230        return {
231            "pools": {
232                name: pool.stats
233                for name, pool in self._pools.items()
234            },
235            "cache_size": len(self._cache)
236        }

Request batching reduces per-request overhead for LLM calls, while connection pooling efficiently manages external service connections. Together, these optimizations significantly improve throughput.


Bottleneck Identification

Identifying bottlenecks is essential for effective scaling. Agent systems have multiple potential bottlenecks: LLM API limits, memory for context, database connections, and compute for tool execution.

Performance Profiler

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from contextlib import asynccontextmanager
4import asyncio
5import time
6from collections import defaultdict
7
8
9@dataclass
10class ProfileData:
11    """Profiling data for an operation."""
12    name: str
13    duration_ms: float
14    start_time: float
15    end_time: float
16    metadata: dict[str, Any] = field(default_factory=dict)
17    children: list["ProfileData"] = field(default_factory=list)
18
19
20class OperationProfiler:
21    """Profiles operations to identify bottlenecks."""
22
23    def __init__(self):
24        self._profiles: list[ProfileData] = []
25        self._current_stack: list[ProfileData] = []
26        self._stats: dict[str, list[float]] = defaultdict(list)
27
28    @asynccontextmanager
29    async def profile(
30        self,
31        name: str,
32        metadata: Optional[dict[str, Any]] = None
33    ):
34        """Profile an async operation."""
35        start = time.perf_counter()
36
37        profile = ProfileData(
38            name=name,
39            duration_ms=0,
40            start_time=start,
41            end_time=0,
42            metadata=metadata or {}
43        )
44
45        # Track parent-child relationships
46        if self._current_stack:
47            self._current_stack[-1].children.append(profile)
48        else:
49            self._profiles.append(profile)
50
51        self._current_stack.append(profile)
52
53        try:
54            yield profile
55        finally:
56            end = time.perf_counter()
57            profile.end_time = end
58            profile.duration_ms = (end - start) * 1000
59            self._stats[name].append(profile.duration_ms)
60            self._current_stack.pop()
61
62    def get_bottlenecks(
63        self,
64        threshold_percentile: float = 0.9
65    ) -> list[dict[str, Any]]:
66        """Identify operations that are bottlenecks."""
67        bottlenecks = []
68
69        for name, durations in self._stats.items():
70            if not durations:
71                continue
72
73            sorted_durations = sorted(durations)
74            idx = int(len(sorted_durations) * threshold_percentile)
75            p90 = sorted_durations[min(idx, len(sorted_durations) - 1)]
76            avg = sum(durations) / len(durations)
77            max_duration = max(durations)
78
79            # Flag as bottleneck if P90 is significantly higher than average
80            if p90 > avg * 2:
81                bottlenecks.append({
82                    "operation": name,
83                    "count": len(durations),
84                    "avg_ms": avg,
85                    "p90_ms": p90,
86                    "max_ms": max_duration,
87                    "severity": "high" if p90 > 1000 else "medium"
88                })
89
90        return sorted(bottlenecks, key=lambda x: x["p90_ms"], reverse=True)
91
92    def get_summary(self) -> dict[str, Any]:
93        """Get profiling summary."""
94        summary = {}
95
96        for name, durations in self._stats.items():
97            if durations:
98                sorted_d = sorted(durations)
99                summary[name] = {
100                    "count": len(durations),
101                    "avg_ms": sum(durations) / len(durations),
102                    "min_ms": min(durations),
103                    "max_ms": max(durations),
104                    "p50_ms": sorted_d[len(sorted_d) // 2],
105                    "p95_ms": sorted_d[int(len(sorted_d) * 0.95)]
106                }
107
108        return summary
109
110    def clear(self) -> None:
111        """Clear profiling data."""
112        self._profiles.clear()
113        self._stats.clear()
114
115
116class BottleneckAnalyzer:
117    """Analyzes system bottlenecks across components."""
118
119    def __init__(self):
120        self._profiler = OperationProfiler()
121        self._resource_usage: dict[str, list[float]] = defaultdict(list)
122
123    @asynccontextmanager
124    async def trace(self, operation: str):
125        """Trace an operation."""
126        async with self._profiler.profile(operation):
127            yield
128
129    def record_resource_usage(
130        self,
131        resource: str,
132        usage: float
133    ) -> None:
134        """Record resource usage (0-1 scale)."""
135        self._resource_usage[resource].append(usage)
136        # Keep last 1000 readings
137        if len(self._resource_usage[resource]) > 1000:
138            self._resource_usage[resource] = self._resource_usage[resource][-1000:]
139
140    def analyze(self) -> dict[str, Any]:
141        """Analyze bottlenecks and provide recommendations."""
142        analysis = {
143            "operation_bottlenecks": self._profiler.get_bottlenecks(),
144            "resource_bottlenecks": [],
145            "recommendations": []
146        }
147
148        # Analyze resource usage
149        for resource, usage in self._resource_usage.items():
150            if not usage:
151                continue
152
153            avg_usage = sum(usage) / len(usage)
154            max_usage = max(usage)
155
156            if avg_usage > 0.8:
157                analysis["resource_bottlenecks"].append({
158                    "resource": resource,
159                    "avg_usage": avg_usage,
160                    "max_usage": max_usage,
161                    "severity": "high"
162                })
163                analysis["recommendations"].append(
164                    f"Scale {resource} - consistently above 80% utilization"
165                )
166            elif max_usage > 0.95:
167                analysis["resource_bottlenecks"].append({
168                    "resource": resource,
169                    "avg_usage": avg_usage,
170                    "max_usage": max_usage,
171                    "severity": "medium"
172                })
173                analysis["recommendations"].append(
174                    f"Monitor {resource} - occasional spikes above 95%"
175                )
176
177        # Generate operation recommendations
178        for bottleneck in analysis["operation_bottlenecks"][:3]:
179            op = bottleneck["operation"]
180            if "llm" in op.lower():
181                analysis["recommendations"].append(
182                    f"Consider caching or batching for {op}"
183                )
184            elif "database" in op.lower() or "db" in op.lower():
185                analysis["recommendations"].append(
186                    f"Add connection pooling or read replicas for {op}"
187                )
188            else:
189                analysis["recommendations"].append(
190                    f"Profile and optimize {op}"
191                )
192
193        return analysis

The profiler tracks operation timing across the system, identifying which operations contribute most to latency. Combined with resource monitoring, this provides a complete picture of system bottlenecks.


Summary

Scaling agent systems requires a multi-faceted approach that addresses the unique characteristics of LLM workloads. Effective scaling combines horizontal scaling, intelligent load balancing, auto-scaling, resource optimization, and continuous bottleneck analysis.

Key Takeaways

  • Stateless Design - Externalize state to enable any instance to handle any request
  • Intelligent Load Balancing - Use adaptive algorithms that consider latency, connections, and error rates
  • Custom Scaling Metrics - Queue depth and latency are better signals than CPU for agent workloads
  • Request Batching - Batch LLM requests to reduce per-request overhead
  • Proactive Scaling - Use historical patterns to scale before load arrives
Next Steps: The next section covers queue-based agent processing for handling long-running workflows and managing backpressure in production systems.