Introduction
Queue-based processing decouples request submission from execution, enabling better handling of variable workloads and long-running agent tasks. This architecture is essential for production systems that must gracefully handle traffic spikes and maintain responsiveness under load.
Learning Objectives: By the end of this section, you will understand how to design queue-based architectures for agents, implement reliable task queues with delivery guarantees, build worker patterns for processing agent tasks, and manage backpressure effectively.
Synchronous request handling works well for quick operations, but agent tasks often involve multiple LLM calls, tool executions, and workflow steps that can take minutes to complete. Queue-based processing provides the foundation for handling these long-running operations reliably.
Queue Architecture
A well-designed queue architecture separates producers (API handlers) from consumers (workers), enabling independent scaling and providing natural buffering during load spikes.
Core Queue Components
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, Optional, Generic, TypeVar
4from enum import Enum
5import asyncio
6from datetime import datetime, timedelta
7import uuid
8import json
9
10
11T = TypeVar('T')
12
13
14class TaskStatus(Enum):
15 """Status of a queued task."""
16 PENDING = "pending"
17 PROCESSING = "processing"
18 COMPLETED = "completed"
19 FAILED = "failed"
20 RETRYING = "retrying"
21 DEAD_LETTER = "dead_letter"
22
23
24@dataclass
25class QueueTask(Generic[T]):
26 """A task in the queue."""
27 task_id: str
28 payload: T
29 status: TaskStatus = TaskStatus.PENDING
30 priority: int = 0
31 created_at: datetime = field(default_factory=datetime.utcnow)
32 started_at: Optional[datetime] = None
33 completed_at: Optional[datetime] = None
34 retry_count: int = 0
35 max_retries: int = 3
36 timeout_seconds: float = 300.0
37 metadata: dict[str, Any] = field(default_factory=dict)
38 error: Optional[str] = None
39
40 @property
41 def is_expired(self) -> bool:
42 if self.started_at is None:
43 return False
44 elapsed = (datetime.utcnow() - self.started_at).total_seconds()
45 return elapsed > self.timeout_seconds
46
47 def to_dict(self) -> dict[str, Any]:
48 return {
49 "task_id": self.task_id,
50 "payload": self.payload,
51 "status": self.status.value,
52 "priority": self.priority,
53 "created_at": self.created_at.isoformat(),
54 "started_at": self.started_at.isoformat() if self.started_at else None,
55 "completed_at": self.completed_at.isoformat() if self.completed_at else None,
56 "retry_count": self.retry_count,
57 "max_retries": self.max_retries,
58 "error": self.error
59 }
60
61
62class TaskQueue(ABC, Generic[T]):
63 """Abstract task queue interface."""
64
65 @abstractmethod
66 async def enqueue(
67 self,
68 payload: T,
69 priority: int = 0,
70 delay_seconds: float = 0
71 ) -> str:
72 """Add a task to the queue."""
73 pass
74
75 @abstractmethod
76 async def dequeue(
77 self,
78 timeout_seconds: float = 30.0
79 ) -> Optional[QueueTask[T]]:
80 """Get the next task from the queue."""
81 pass
82
83 @abstractmethod
84 async def complete(self, task_id: str, result: Any = None) -> None:
85 """Mark a task as completed."""
86 pass
87
88 @abstractmethod
89 async def fail(
90 self,
91 task_id: str,
92 error: str,
93 retry: bool = True
94 ) -> None:
95 """Mark a task as failed."""
96 pass
97
98 @abstractmethod
99 async def get_status(self, task_id: str) -> Optional[QueueTask[T]]:
100 """Get task status."""
101 pass
102
103
104class InMemoryQueue(TaskQueue[T]):
105 """In-memory task queue for development."""
106
107 def __init__(self):
108 self._pending: asyncio.PriorityQueue = asyncio.PriorityQueue()
109 self._tasks: dict[str, QueueTask[T]] = {}
110 self._results: dict[str, Any] = {}
111 self._lock = asyncio.Lock()
112
113 async def enqueue(
114 self,
115 payload: T,
116 priority: int = 0,
117 delay_seconds: float = 0
118 ) -> str:
119 task_id = str(uuid.uuid4())
120 task = QueueTask(
121 task_id=task_id,
122 payload=payload,
123 priority=priority
124 )
125
126 async with self._lock:
127 self._tasks[task_id] = task
128
129 if delay_seconds > 0:
130 asyncio.create_task(self._delayed_enqueue(task, delay_seconds))
131 else:
132 # Lower priority number = higher priority
133 await self._pending.put((-priority, task.created_at, task_id))
134
135 return task_id
136
137 async def _delayed_enqueue(
138 self,
139 task: QueueTask[T],
140 delay: float
141 ) -> None:
142 await asyncio.sleep(delay)
143 await self._pending.put(
144 (-task.priority, task.created_at, task.task_id)
145 )
146
147 async def dequeue(
148 self,
149 timeout_seconds: float = 30.0
150 ) -> Optional[QueueTask[T]]:
151 try:
152 _, _, task_id = await asyncio.wait_for(
153 self._pending.get(),
154 timeout=timeout_seconds
155 )
156
157 async with self._lock:
158 task = self._tasks.get(task_id)
159 if task:
160 task.status = TaskStatus.PROCESSING
161 task.started_at = datetime.utcnow()
162 return task
163
164 except asyncio.TimeoutError:
165 return None
166
167 async def complete(self, task_id: str, result: Any = None) -> None:
168 async with self._lock:
169 task = self._tasks.get(task_id)
170 if task:
171 task.status = TaskStatus.COMPLETED
172 task.completed_at = datetime.utcnow()
173 self._results[task_id] = result
174
175 async def fail(
176 self,
177 task_id: str,
178 error: str,
179 retry: bool = True
180 ) -> None:
181 async with self._lock:
182 task = self._tasks.get(task_id)
183 if not task:
184 return
185
186 task.error = error
187
188 if retry and task.retry_count < task.max_retries:
189 task.retry_count += 1
190 task.status = TaskStatus.RETRYING
191 task.started_at = None
192 # Re-enqueue with exponential backoff
193 delay = 2 ** task.retry_count
194 await self._pending.put(
195 (-task.priority, datetime.utcnow(), task_id)
196 )
197 else:
198 task.status = TaskStatus.DEAD_LETTER
199 task.completed_at = datetime.utcnow()
200
201 async def get_status(self, task_id: str) -> Optional[QueueTask[T]]:
202 async with self._lock:
203 return self._tasks.get(task_id)
204
205 async def get_result(self, task_id: str) -> Optional[Any]:
206 async with self._lock:
207 return self._results.get(task_id)The queue architecture provides task tracking, retry handling, and priority support. The in-memory implementation is useful for development, while production systems use durable message brokers.
Task Queue Implementation
Production task queues require durability, exactly-once processing guarantees, and visibility timeout handling. Redis and message brokers like RabbitMQ or Amazon SQS provide these capabilities.
Redis-Based Task Queue
1from typing import Any, Optional, TypeVar
2import asyncio
3import json
4import time
5import uuid
6
7
8T = TypeVar('T')
9
10
11class RedisTaskQueue(TaskQueue[T]):
12 """Redis-backed task queue with reliability features."""
13
14 def __init__(
15 self,
16 redis_url: str,
17 queue_name: str = "agent_tasks",
18 visibility_timeout: int = 300
19 ):
20 self.redis_url = redis_url
21 self.queue_name = queue_name
22 self.visibility_timeout = visibility_timeout
23 self._pool = None
24
25 async def _get_pool(self):
26 if self._pool is None:
27 import redis.asyncio as redis
28 self._pool = redis.from_url(self.redis_url)
29 return self._pool
30
31 def _task_key(self, task_id: str) -> str:
32 return f"task:{self.queue_name}:{task_id}"
33
34 async def enqueue(
35 self,
36 payload: T,
37 priority: int = 0,
38 delay_seconds: float = 0
39 ) -> str:
40 pool = await self._get_pool()
41 task_id = str(uuid.uuid4())
42
43 task = QueueTask(
44 task_id=task_id,
45 payload=payload,
46 priority=priority
47 )
48
49 # Store task data
50 await pool.hset(
51 self._task_key(task_id),
52 mapping={
53 "data": json.dumps(task.to_dict()),
54 "payload": json.dumps(payload),
55 "status": TaskStatus.PENDING.value
56 }
57 )
58
59 # Add to sorted set (score = priority, then time)
60 score = -priority * 1e12 + time.time() + delay_seconds
61 await pool.zadd(
62 f"queue:{self.queue_name}:pending",
63 {task_id: score}
64 )
65
66 return task_id
67
68 async def dequeue(
69 self,
70 timeout_seconds: float = 30.0
71 ) -> Optional[QueueTask[T]]:
72 pool = await self._get_pool()
73 deadline = time.time() + timeout_seconds
74
75 while time.time() < deadline:
76 # Atomically move task from pending to processing
77 # using Lua script for atomicity
78 script = """
79 local task_id = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, 1)
80 if #task_id == 0 then
81 return nil
82 end
83 task_id = task_id[1]
84 redis.call('ZREM', KEYS[1], task_id)
85 redis.call('ZADD', KEYS[2], ARGV[2], task_id)
86 redis.call('HSET', KEYS[3] .. task_id, 'status', 'processing', 'started_at', ARGV[3])
87 return task_id
88 """
89
90 now = time.time()
91 task_id = await pool.eval(
92 script,
93 3,
94 f"queue:{self.queue_name}:pending",
95 f"queue:{self.queue_name}:processing",
96 "task:" + self.queue_name + ":",
97 str(now), # Max score to dequeue
98 str(now + self.visibility_timeout), # Processing deadline
99 str(now) # Started at
100 )
101
102 if task_id:
103 task_id = task_id.decode() if isinstance(task_id, bytes) else task_id
104 task_data = await pool.hget(self._task_key(task_id), "data")
105 if task_data:
106 data = json.loads(task_data)
107 return QueueTask(
108 task_id=task_id,
109 payload=json.loads(
110 await pool.hget(self._task_key(task_id), "payload")
111 ),
112 status=TaskStatus.PROCESSING,
113 priority=data.get("priority", 0),
114 retry_count=data.get("retry_count", 0),
115 max_retries=data.get("max_retries", 3)
116 )
117
118 await asyncio.sleep(0.1)
119
120 return None
121
122 async def complete(self, task_id: str, result: Any = None) -> None:
123 pool = await self._get_pool()
124
125 # Remove from processing, update status
126 await pool.zrem(f"queue:{self.queue_name}:processing", task_id)
127 await pool.hset(
128 self._task_key(task_id),
129 mapping={
130 "status": TaskStatus.COMPLETED.value,
131 "completed_at": str(time.time()),
132 "result": json.dumps(result) if result else ""
133 }
134 )
135
136 # Set expiry for completed tasks
137 await pool.expire(self._task_key(task_id), 86400) # 24 hours
138
139 async def fail(
140 self,
141 task_id: str,
142 error: str,
143 retry: bool = True
144 ) -> None:
145 pool = await self._get_pool()
146
147 # Get current task data
148 task_data = await pool.hget(self._task_key(task_id), "data")
149 if not task_data:
150 return
151
152 data = json.loads(task_data)
153 retry_count = data.get("retry_count", 0)
154 max_retries = data.get("max_retries", 3)
155
156 await pool.zrem(f"queue:{self.queue_name}:processing", task_id)
157
158 if retry and retry_count < max_retries:
159 # Re-queue with backoff
160 new_retry_count = retry_count + 1
161 delay = 2 ** new_retry_count
162
163 data["retry_count"] = new_retry_count
164 await pool.hset(
165 self._task_key(task_id),
166 mapping={
167 "data": json.dumps(data),
168 "status": TaskStatus.RETRYING.value,
169 "error": error
170 }
171 )
172
173 score = time.time() + delay
174 await pool.zadd(
175 f"queue:{self.queue_name}:pending",
176 {task_id: score}
177 )
178 else:
179 # Move to dead letter queue
180 await pool.hset(
181 self._task_key(task_id),
182 mapping={
183 "status": TaskStatus.DEAD_LETTER.value,
184 "error": error,
185 "completed_at": str(time.time())
186 }
187 )
188 await pool.zadd(
189 f"queue:{self.queue_name}:dead_letter",
190 {task_id: time.time()}
191 )
192
193 async def get_status(self, task_id: str) -> Optional[QueueTask[T]]:
194 pool = await self._get_pool()
195 data = await pool.hgetall(self._task_key(task_id))
196
197 if not data:
198 return None
199
200 task_data = json.loads(data.get(b"data", b"{}").decode())
201 return QueueTask(
202 task_id=task_id,
203 payload=json.loads(data.get(b"payload", b"null").decode()),
204 status=TaskStatus(data.get(b"status", b"pending").decode()),
205 priority=task_data.get("priority", 0),
206 retry_count=task_data.get("retry_count", 0),
207 error=data.get(b"error", b"").decode() or None
208 )
209
210 async def recover_stale_tasks(self) -> int:
211 """Recover tasks that exceeded visibility timeout."""
212 pool = await self._get_pool()
213
214 # Find expired processing tasks
215 now = time.time()
216 stale_tasks = await pool.zrangebyscore(
217 f"queue:{self.queue_name}:processing",
218 "-inf",
219 str(now)
220 )
221
222 recovered = 0
223 for task_id in stale_tasks:
224 task_id = task_id.decode() if isinstance(task_id, bytes) else task_id
225
226 # Move back to pending
227 await pool.zrem(f"queue:{self.queue_name}:processing", task_id)
228 await pool.zadd(
229 f"queue:{self.queue_name}:pending",
230 {task_id: now}
231 )
232 await pool.hset(
233 self._task_key(task_id),
234 "status",
235 TaskStatus.PENDING.value
236 )
237 recovered += 1
238
239 return recoveredThe Redis queue uses Lua scripts for atomic operations, ensuring tasks are not lost or duplicated. Visibility timeout and recovery mechanisms handle worker failures gracefully.
Worker Patterns
Workers consume tasks from queues and execute agent logic. Various patterns exist for worker design, from simple single-threaded workers to complex multi-process pools with graceful shutdown.
Agent Task Worker
1from dataclasses import dataclass
2from typing import Any, Optional, Callable, Awaitable
3import asyncio
4import signal
5import logging
6
7
8@dataclass
9class WorkerConfig:
10 """Configuration for task workers."""
11 worker_id: str
12 concurrency: int = 5
13 poll_interval: float = 1.0
14 shutdown_timeout: float = 30.0
15 heartbeat_interval: float = 10.0
16
17
18class TaskWorker:
19 """Worker that processes tasks from a queue."""
20
21 def __init__(
22 self,
23 config: WorkerConfig,
24 queue: TaskQueue,
25 handler: Callable[[Any], Awaitable[Any]]
26 ):
27 self.config = config
28 self.queue = queue
29 self.handler = handler
30
31 self._running = False
32 self._active_tasks: set[asyncio.Task] = set()
33 self._semaphore = asyncio.Semaphore(config.concurrency)
34 self._shutdown_event = asyncio.Event()
35
36 self.logger = logging.getLogger(f"worker.{config.worker_id}")
37
38 async def start(self) -> None:
39 """Start the worker."""
40 self._running = True
41 self.logger.info(f"Worker {self.config.worker_id} starting")
42
43 # Set up signal handlers
44 loop = asyncio.get_running_loop()
45 for sig in (signal.SIGTERM, signal.SIGINT):
46 loop.add_signal_handler(sig, self._signal_handler)
47
48 # Start heartbeat task
49 heartbeat_task = asyncio.create_task(self._heartbeat_loop())
50
51 try:
52 await self._main_loop()
53 finally:
54 heartbeat_task.cancel()
55 await self._graceful_shutdown()
56
57 def _signal_handler(self) -> None:
58 """Handle shutdown signal."""
59 self.logger.info("Received shutdown signal")
60 self._running = False
61 self._shutdown_event.set()
62
63 async def _main_loop(self) -> None:
64 """Main processing loop."""
65 while self._running:
66 try:
67 # Wait for available slot
68 await self._semaphore.acquire()
69
70 # Poll for task
71 task = await self.queue.dequeue(
72 timeout_seconds=self.config.poll_interval
73 )
74
75 if task is None:
76 self._semaphore.release()
77 continue
78
79 # Process task asynchronously
80 process_task = asyncio.create_task(
81 self._process_task(task)
82 )
83 self._active_tasks.add(process_task)
84 process_task.add_done_callback(
85 lambda t: self._task_done(t)
86 )
87
88 except Exception as e:
89 self.logger.error(f"Error in main loop: {e}")
90 self._semaphore.release()
91 await asyncio.sleep(1)
92
93 def _task_done(self, task: asyncio.Task) -> None:
94 """Callback when a task completes."""
95 self._active_tasks.discard(task)
96 self._semaphore.release()
97
98 async def _process_task(self, task: QueueTask) -> None:
99 """Process a single task."""
100 self.logger.info(f"Processing task {task.task_id}")
101
102 try:
103 result = await asyncio.wait_for(
104 self.handler(task.payload),
105 timeout=task.timeout_seconds
106 )
107 await self.queue.complete(task.task_id, result)
108 self.logger.info(f"Task {task.task_id} completed")
109
110 except asyncio.TimeoutError:
111 await self.queue.fail(
112 task.task_id,
113 "Task timed out",
114 retry=True
115 )
116 self.logger.warning(f"Task {task.task_id} timed out")
117
118 except Exception as e:
119 await self.queue.fail(
120 task.task_id,
121 str(e),
122 retry=True
123 )
124 self.logger.error(f"Task {task.task_id} failed: {e}")
125
126 async def _heartbeat_loop(self) -> None:
127 """Send periodic heartbeats."""
128 while self._running:
129 self.logger.debug(
130 f"Heartbeat: {len(self._active_tasks)} active tasks"
131 )
132 await asyncio.sleep(self.config.heartbeat_interval)
133
134 async def _graceful_shutdown(self) -> None:
135 """Gracefully shutdown the worker."""
136 self.logger.info("Starting graceful shutdown")
137
138 # Wait for active tasks to complete
139 if self._active_tasks:
140 self.logger.info(
141 f"Waiting for {len(self._active_tasks)} tasks"
142 )
143 done, pending = await asyncio.wait(
144 self._active_tasks,
145 timeout=self.config.shutdown_timeout
146 )
147
148 if pending:
149 self.logger.warning(
150 f"Force cancelling {len(pending)} tasks"
151 )
152 for task in pending:
153 task.cancel()
154
155 self.logger.info("Shutdown complete")
156
157
158class WorkerPool:
159 """Pool of workers for parallel processing."""
160
161 def __init__(
162 self,
163 num_workers: int,
164 queue: TaskQueue,
165 handler: Callable[[Any], Awaitable[Any]]
166 ):
167 self.num_workers = num_workers
168 self.queue = queue
169 self.handler = handler
170 self._workers: list[TaskWorker] = []
171 self._tasks: list[asyncio.Task] = []
172
173 async def start(self) -> None:
174 """Start all workers."""
175 for i in range(self.num_workers):
176 config = WorkerConfig(
177 worker_id=f"worker-{i}",
178 concurrency=5
179 )
180 worker = TaskWorker(config, self.queue, self.handler)
181 self._workers.append(worker)
182 task = asyncio.create_task(worker.start())
183 self._tasks.append(task)
184
185 async def stop(self) -> None:
186 """Stop all workers."""
187 for worker in self._workers:
188 worker._running = False
189 worker._shutdown_event.set()
190
191 await asyncio.gather(*self._tasks, return_exceptions=True)
192
193 async def scale(self, new_count: int) -> None:
194 """Scale the worker pool."""
195 current = len(self._workers)
196
197 if new_count > current:
198 # Add workers
199 for i in range(current, new_count):
200 config = WorkerConfig(
201 worker_id=f"worker-{i}",
202 concurrency=5
203 )
204 worker = TaskWorker(config, self.queue, self.handler)
205 self._workers.append(worker)
206 task = asyncio.create_task(worker.start())
207 self._tasks.append(task)
208
209 elif new_count < current:
210 # Remove workers
211 for _ in range(current - new_count):
212 worker = self._workers.pop()
213 task = self._tasks.pop()
214 worker._running = False
215 worker._shutdown_event.set()
216 await taskThe worker pattern supports concurrent task processing, graceful shutdown, and dynamic scaling. Signal handlers ensure clean termination even when receiving system signals.
Backpressure Management
Backpressure occurs when tasks arrive faster than they can be processed. Proper backpressure management prevents system overload and maintains service quality under high load.
Backpressure Controller
1from dataclasses import dataclass
2from typing import Optional
3from enum import Enum
4import asyncio
5import time
6
7
8class BackpressureAction(Enum):
9 """Actions to take under backpressure."""
10 ACCEPT = "accept"
11 DELAY = "delay"
12 REJECT = "reject"
13 SHED_LOAD = "shed_load"
14
15
16@dataclass
17class BackpressureConfig:
18 """Configuration for backpressure management."""
19 # Queue depth thresholds
20 queue_low_water: int = 100
21 queue_high_water: int = 500
22 queue_critical: int = 1000
23
24 # Processing rate thresholds
25 min_processing_rate: float = 10.0 # tasks/second
26
27 # Response actions
28 delay_ms_per_task: float = 10.0
29 max_delay_ms: float = 5000.0
30
31 # Load shedding
32 shed_priority_below: int = 5
33
34
35class BackpressureController:
36 """Controls backpressure for the task queue."""
37
38 def __init__(self, config: BackpressureConfig):
39 self.config = config
40 self._queue_depth = 0
41 self._processing_rate = 0.0
42 self._processed_count = 0
43 self._last_rate_calc = time.time()
44 self._lock = asyncio.Lock()
45
46 async def update_metrics(
47 self,
48 queue_depth: int,
49 processed: int = 0
50 ) -> None:
51 """Update backpressure metrics."""
52 async with self._lock:
53 self._queue_depth = queue_depth
54 self._processed_count += processed
55
56 # Calculate processing rate
57 now = time.time()
58 elapsed = now - self._last_rate_calc
59 if elapsed >= 1.0:
60 self._processing_rate = self._processed_count / elapsed
61 self._processed_count = 0
62 self._last_rate_calc = now
63
64 async def evaluate(
65 self,
66 priority: int = 0
67 ) -> tuple[BackpressureAction, Optional[float]]:
68 """Evaluate what action to take for a new task."""
69 async with self._lock:
70 queue_depth = self._queue_depth
71 rate = self._processing_rate
72
73 # Critical - reject or shed load
74 if queue_depth >= self.config.queue_critical:
75 if priority < self.config.shed_priority_below:
76 return BackpressureAction.SHED_LOAD, None
77 return BackpressureAction.REJECT, None
78
79 # High water - delay or shed low priority
80 if queue_depth >= self.config.queue_high_water:
81 if priority < self.config.shed_priority_below:
82 return BackpressureAction.SHED_LOAD, None
83
84 # Calculate delay
85 excess = queue_depth - self.config.queue_high_water
86 delay = min(
87 excess * self.config.delay_ms_per_task,
88 self.config.max_delay_ms
89 )
90 return BackpressureAction.DELAY, delay
91
92 # Low processing rate - delay
93 if rate < self.config.min_processing_rate and queue_depth > 0:
94 delay = min(
95 (self.config.min_processing_rate / max(rate, 0.1)) * 100,
96 self.config.max_delay_ms
97 )
98 return BackpressureAction.DELAY, delay
99
100 return BackpressureAction.ACCEPT, None
101
102
103class AdaptiveRateLimiter:
104 """Adapts rate limiting based on system state."""
105
106 def __init__(
107 self,
108 base_rate: float,
109 min_rate: float,
110 max_rate: float
111 ):
112 self.base_rate = base_rate
113 self.min_rate = min_rate
114 self.max_rate = max_rate
115 self._current_rate = base_rate
116 self._window_start = time.time()
117 self._requests_in_window = 0
118 self._lock = asyncio.Lock()
119
120 async def acquire(self) -> bool:
121 """Attempt to acquire a rate limit token."""
122 async with self._lock:
123 now = time.time()
124 window_elapsed = now - self._window_start
125
126 # Reset window every second
127 if window_elapsed >= 1.0:
128 self._window_start = now
129 self._requests_in_window = 0
130
131 if self._requests_in_window < self._current_rate:
132 self._requests_in_window += 1
133 return True
134
135 return False
136
137 async def adjust_rate(
138 self,
139 success_rate: float,
140 latency_ms: float,
141 target_latency_ms: float = 1000.0
142 ) -> None:
143 """Adjust rate based on system performance."""
144 async with self._lock:
145 # Decrease rate if latency is high or success rate is low
146 if latency_ms > target_latency_ms * 1.5 or success_rate < 0.95:
147 self._current_rate = max(
148 self.min_rate,
149 self._current_rate * 0.9
150 )
151 # Increase rate if performing well
152 elif latency_ms < target_latency_ms * 0.5 and success_rate > 0.99:
153 self._current_rate = min(
154 self.max_rate,
155 self._current_rate * 1.1
156 )
157
158 @property
159 def current_rate(self) -> float:
160 return self._current_rate
161
162
163class LoadShedder:
164 """Sheds load when system is overwhelmed."""
165
166 def __init__(
167 self,
168 shed_threshold: float = 0.9,
169 recovery_threshold: float = 0.7
170 ):
171 self.shed_threshold = shed_threshold
172 self.recovery_threshold = recovery_threshold
173 self._shedding = False
174 self._shed_count = 0
175
176 def should_shed(
177 self,
178 utilization: float,
179 priority: int,
180 priority_threshold: int = 5
181 ) -> bool:
182 """Determine if load should be shed."""
183 # Enter shedding mode
184 if utilization >= self.shed_threshold:
185 self._shedding = True
186
187 # Exit shedding mode
188 if utilization < self.recovery_threshold:
189 self._shedding = False
190
191 # Shed low priority requests when in shedding mode
192 if self._shedding and priority < priority_threshold:
193 self._shed_count += 1
194 return True
195
196 return False
197
198 @property
199 def stats(self) -> dict[str, Any]:
200 return {
201 "shedding": self._shedding,
202 "shed_count": self._shed_count
203 }Backpressure management combines queue monitoring, adaptive rate limiting, and load shedding to maintain system stability. Low-priority requests are shed first to preserve capacity for important work.
Priority and Scheduling
Not all tasks are equal. Priority scheduling ensures important tasks are processed first, while fair scheduling prevents starvation of lower-priority work.
Priority Scheduler
1from dataclasses import dataclass, field
2from typing import Any, Optional, Dict, List
3from enum import IntEnum
4import asyncio
5import time
6from collections import defaultdict
7
8
9class TaskPriority(IntEnum):
10 """Task priority levels."""
11 CRITICAL = 100
12 HIGH = 75
13 NORMAL = 50
14 LOW = 25
15 BACKGROUND = 0
16
17
18@dataclass
19class PriorityBucket:
20 """Bucket for tasks of a specific priority."""
21 priority: int
22 queue: asyncio.Queue = field(default_factory=asyncio.Queue)
23 weight: float = 1.0
24 processed: int = 0
25 last_served: float = field(default_factory=time.time)
26
27
28class WeightedFairScheduler:
29 """Weighted fair scheduler for task priorities."""
30
31 def __init__(
32 self,
33 weights: Optional[Dict[int, float]] = None
34 ):
35 self._buckets: Dict[int, PriorityBucket] = {}
36
37 # Default weights based on priority
38 default_weights = {
39 TaskPriority.CRITICAL: 10.0,
40 TaskPriority.HIGH: 5.0,
41 TaskPriority.NORMAL: 2.0,
42 TaskPriority.LOW: 1.0,
43 TaskPriority.BACKGROUND: 0.5
44 }
45 weights = weights or default_weights
46
47 for priority, weight in weights.items():
48 self._buckets[priority] = PriorityBucket(
49 priority=priority,
50 weight=weight
51 )
52
53 async def enqueue(
54 self,
55 task: QueueTask,
56 priority: int = TaskPriority.NORMAL
57 ) -> None:
58 """Enqueue a task at a given priority."""
59 bucket = self._buckets.get(priority)
60 if not bucket:
61 # Create bucket for unknown priority
62 bucket = PriorityBucket(priority=priority, weight=1.0)
63 self._buckets[priority] = bucket
64
65 await bucket.queue.put(task)
66
67 async def dequeue(
68 self,
69 timeout_seconds: float = 30.0
70 ) -> Optional[QueueTask]:
71 """Dequeue next task using weighted fair scheduling."""
72 deadline = time.time() + timeout_seconds
73
74 while time.time() < deadline:
75 # Calculate virtual time for each bucket
76 # Lower virtual time = should be served next
77 candidates = []
78 for bucket in self._buckets.values():
79 if not bucket.queue.empty():
80 virtual_time = bucket.processed / bucket.weight
81 candidates.append((virtual_time, bucket))
82
83 if not candidates:
84 await asyncio.sleep(0.1)
85 continue
86
87 # Select bucket with lowest virtual time
88 candidates.sort(key=lambda x: x[0])
89 _, selected_bucket = candidates[0]
90
91 try:
92 task = selected_bucket.queue.get_nowait()
93 selected_bucket.processed += 1
94 selected_bucket.last_served = time.time()
95 return task
96 except asyncio.QueueEmpty:
97 continue
98
99 return None
100
101 def get_stats(self) -> Dict[int, Dict[str, Any]]:
102 """Get scheduler statistics."""
103 return {
104 priority: {
105 "queued": bucket.queue.qsize(),
106 "processed": bucket.processed,
107 "weight": bucket.weight
108 }
109 for priority, bucket in self._buckets.items()
110 }
111
112
113class DeadlineScheduler:
114 """Scheduler that considers task deadlines."""
115
116 def __init__(self):
117 self._tasks: List[tuple[float, QueueTask]] = [] # (deadline, task)
118 self._lock = asyncio.Lock()
119
120 async def enqueue(
121 self,
122 task: QueueTask,
123 deadline: Optional[float] = None
124 ) -> None:
125 """Enqueue task with deadline."""
126 if deadline is None:
127 deadline = time.time() + task.timeout_seconds
128
129 async with self._lock:
130 self._tasks.append((deadline, task))
131 self._tasks.sort(key=lambda x: x[0]) # Sort by deadline
132
133 async def dequeue(self) -> Optional[QueueTask]:
134 """Dequeue task with earliest deadline."""
135 async with self._lock:
136 now = time.time()
137
138 # Find task with earliest deadline that hasnt expired
139 for i, (deadline, task) in enumerate(self._tasks):
140 if deadline >= now:
141 self._tasks.pop(i)
142 return task
143 else:
144 # Task expired - move to dead letter
145 self._tasks.pop(i)
146 task.status = TaskStatus.DEAD_LETTER
147 task.error = "Deadline exceeded before processing"
148 # Would normally move to dead letter queue
149
150 return None
151
152 async def get_expired_count(self) -> int:
153 """Get count of expired tasks."""
154 async with self._lock:
155 now = time.time()
156 return sum(1 for deadline, _ in self._tasks if deadline < now)
157
158
159class MultiTenantScheduler:
160 """Fair scheduler across multiple tenants."""
161
162 def __init__(
163 self,
164 default_quota: int = 100
165 ):
166 self.default_quota = default_quota
167 self._tenant_queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
168 self._tenant_quotas: Dict[str, int] = {}
169 self._tenant_usage: Dict[str, int] = defaultdict(int)
170 self._last_reset = time.time()
171 self._lock = asyncio.Lock()
172
173 async def set_quota(self, tenant_id: str, quota: int) -> None:
174 """Set quota for a tenant."""
175 self._tenant_quotas[tenant_id] = quota
176
177 async def enqueue(
178 self,
179 task: QueueTask,
180 tenant_id: str
181 ) -> bool:
182 """Enqueue task for a tenant."""
183 async with self._lock:
184 # Reset usage periodically
185 now = time.time()
186 if now - self._last_reset > 60:
187 self._tenant_usage.clear()
188 self._last_reset = now
189
190 # Check quota
191 quota = self._tenant_quotas.get(tenant_id, self.default_quota)
192 if self._tenant_usage[tenant_id] >= quota:
193 return False
194
195 self._tenant_usage[tenant_id] += 1
196 await self._tenant_queues[tenant_id].put(task)
197 return True
198
199 async def dequeue(self) -> Optional[tuple[str, QueueTask]]:
200 """Dequeue task with round-robin across tenants."""
201 async with self._lock:
202 tenants = list(self._tenant_queues.keys())
203
204 for tenant_id in tenants:
205 queue = self._tenant_queues[tenant_id]
206 if not queue.empty():
207 task = await queue.get()
208 # Rotate to end of list for fairness
209 tenants.remove(tenant_id)
210 tenants.append(tenant_id)
211 return tenant_id, task
212
213 return NoneThe weighted fair scheduler balances priority with fairness, ensuring low-priority tasks eventually get processed. Deadline scheduling adds time-sensitivity, while multi-tenant scheduling ensures fair resource allocation across users.
Summary
Queue-based processing is essential for production agent systems. It provides natural buffering, enables reliable retry handling, and allows independent scaling of producers and consumers.
Key Takeaways
- Decoupled Architecture - Separate task submission from execution for better resilience and scalability
- Reliability Features - Use visibility timeouts, dead letter queues, and retry mechanisms
- Graceful Workers - Implement signal handling and graceful shutdown to prevent task loss
- Backpressure Management - Combine rate limiting, load shedding, and delays to maintain stability
- Fair Scheduling - Balance priority with fairness using weighted scheduling algorithms
Next Steps: The next section covers error recovery and resilience patterns for handling failures gracefully in production agent systems.