Chapter 21
15 min read
Section 131 of 175

Queue-Based Agent Processing

Production Deployment

Introduction

Queue-based processing decouples request submission from execution, enabling better handling of variable workloads and long-running agent tasks. This architecture is essential for production systems that must gracefully handle traffic spikes and maintain responsiveness under load.

Learning Objectives: By the end of this section, you will understand how to design queue-based architectures for agents, implement reliable task queues with delivery guarantees, build worker patterns for processing agent tasks, and manage backpressure effectively.

Synchronous request handling works well for quick operations, but agent tasks often involve multiple LLM calls, tool executions, and workflow steps that can take minutes to complete. Queue-based processing provides the foundation for handling these long-running operations reliably.


Queue Architecture

A well-designed queue architecture separates producers (API handlers) from consumers (workers), enabling independent scaling and providing natural buffering during load spikes.

Core Queue Components

🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, Optional, Generic, TypeVar
4from enum import Enum
5import asyncio
6from datetime import datetime, timedelta
7import uuid
8import json
9
10
11T = TypeVar('T')
12
13
14class TaskStatus(Enum):
15    """Status of a queued task."""
16    PENDING = "pending"
17    PROCESSING = "processing"
18    COMPLETED = "completed"
19    FAILED = "failed"
20    RETRYING = "retrying"
21    DEAD_LETTER = "dead_letter"
22
23
24@dataclass
25class QueueTask(Generic[T]):
26    """A task in the queue."""
27    task_id: str
28    payload: T
29    status: TaskStatus = TaskStatus.PENDING
30    priority: int = 0
31    created_at: datetime = field(default_factory=datetime.utcnow)
32    started_at: Optional[datetime] = None
33    completed_at: Optional[datetime] = None
34    retry_count: int = 0
35    max_retries: int = 3
36    timeout_seconds: float = 300.0
37    metadata: dict[str, Any] = field(default_factory=dict)
38    error: Optional[str] = None
39
40    @property
41    def is_expired(self) -> bool:
42        if self.started_at is None:
43            return False
44        elapsed = (datetime.utcnow() - self.started_at).total_seconds()
45        return elapsed > self.timeout_seconds
46
47    def to_dict(self) -> dict[str, Any]:
48        return {
49            "task_id": self.task_id,
50            "payload": self.payload,
51            "status": self.status.value,
52            "priority": self.priority,
53            "created_at": self.created_at.isoformat(),
54            "started_at": self.started_at.isoformat() if self.started_at else None,
55            "completed_at": self.completed_at.isoformat() if self.completed_at else None,
56            "retry_count": self.retry_count,
57            "max_retries": self.max_retries,
58            "error": self.error
59        }
60
61
62class TaskQueue(ABC, Generic[T]):
63    """Abstract task queue interface."""
64
65    @abstractmethod
66    async def enqueue(
67        self,
68        payload: T,
69        priority: int = 0,
70        delay_seconds: float = 0
71    ) -> str:
72        """Add a task to the queue."""
73        pass
74
75    @abstractmethod
76    async def dequeue(
77        self,
78        timeout_seconds: float = 30.0
79    ) -> Optional[QueueTask[T]]:
80        """Get the next task from the queue."""
81        pass
82
83    @abstractmethod
84    async def complete(self, task_id: str, result: Any = None) -> None:
85        """Mark a task as completed."""
86        pass
87
88    @abstractmethod
89    async def fail(
90        self,
91        task_id: str,
92        error: str,
93        retry: bool = True
94    ) -> None:
95        """Mark a task as failed."""
96        pass
97
98    @abstractmethod
99    async def get_status(self, task_id: str) -> Optional[QueueTask[T]]:
100        """Get task status."""
101        pass
102
103
104class InMemoryQueue(TaskQueue[T]):
105    """In-memory task queue for development."""
106
107    def __init__(self):
108        self._pending: asyncio.PriorityQueue = asyncio.PriorityQueue()
109        self._tasks: dict[str, QueueTask[T]] = {}
110        self._results: dict[str, Any] = {}
111        self._lock = asyncio.Lock()
112
113    async def enqueue(
114        self,
115        payload: T,
116        priority: int = 0,
117        delay_seconds: float = 0
118    ) -> str:
119        task_id = str(uuid.uuid4())
120        task = QueueTask(
121            task_id=task_id,
122            payload=payload,
123            priority=priority
124        )
125
126        async with self._lock:
127            self._tasks[task_id] = task
128
129        if delay_seconds > 0:
130            asyncio.create_task(self._delayed_enqueue(task, delay_seconds))
131        else:
132            # Lower priority number = higher priority
133            await self._pending.put((-priority, task.created_at, task_id))
134
135        return task_id
136
137    async def _delayed_enqueue(
138        self,
139        task: QueueTask[T],
140        delay: float
141    ) -> None:
142        await asyncio.sleep(delay)
143        await self._pending.put(
144            (-task.priority, task.created_at, task.task_id)
145        )
146
147    async def dequeue(
148        self,
149        timeout_seconds: float = 30.0
150    ) -> Optional[QueueTask[T]]:
151        try:
152            _, _, task_id = await asyncio.wait_for(
153                self._pending.get(),
154                timeout=timeout_seconds
155            )
156
157            async with self._lock:
158                task = self._tasks.get(task_id)
159                if task:
160                    task.status = TaskStatus.PROCESSING
161                    task.started_at = datetime.utcnow()
162                return task
163
164        except asyncio.TimeoutError:
165            return None
166
167    async def complete(self, task_id: str, result: Any = None) -> None:
168        async with self._lock:
169            task = self._tasks.get(task_id)
170            if task:
171                task.status = TaskStatus.COMPLETED
172                task.completed_at = datetime.utcnow()
173                self._results[task_id] = result
174
175    async def fail(
176        self,
177        task_id: str,
178        error: str,
179        retry: bool = True
180    ) -> None:
181        async with self._lock:
182            task = self._tasks.get(task_id)
183            if not task:
184                return
185
186            task.error = error
187
188            if retry and task.retry_count < task.max_retries:
189                task.retry_count += 1
190                task.status = TaskStatus.RETRYING
191                task.started_at = None
192                # Re-enqueue with exponential backoff
193                delay = 2 ** task.retry_count
194                await self._pending.put(
195                    (-task.priority, datetime.utcnow(), task_id)
196                )
197            else:
198                task.status = TaskStatus.DEAD_LETTER
199                task.completed_at = datetime.utcnow()
200
201    async def get_status(self, task_id: str) -> Optional[QueueTask[T]]:
202        async with self._lock:
203            return self._tasks.get(task_id)
204
205    async def get_result(self, task_id: str) -> Optional[Any]:
206        async with self._lock:
207            return self._results.get(task_id)

The queue architecture provides task tracking, retry handling, and priority support. The in-memory implementation is useful for development, while production systems use durable message brokers.


Task Queue Implementation

Production task queues require durability, exactly-once processing guarantees, and visibility timeout handling. Redis and message brokers like RabbitMQ or Amazon SQS provide these capabilities.

Redis-Based Task Queue

🐍python
1from typing import Any, Optional, TypeVar
2import asyncio
3import json
4import time
5import uuid
6
7
8T = TypeVar('T')
9
10
11class RedisTaskQueue(TaskQueue[T]):
12    """Redis-backed task queue with reliability features."""
13
14    def __init__(
15        self,
16        redis_url: str,
17        queue_name: str = "agent_tasks",
18        visibility_timeout: int = 300
19    ):
20        self.redis_url = redis_url
21        self.queue_name = queue_name
22        self.visibility_timeout = visibility_timeout
23        self._pool = None
24
25    async def _get_pool(self):
26        if self._pool is None:
27            import redis.asyncio as redis
28            self._pool = redis.from_url(self.redis_url)
29        return self._pool
30
31    def _task_key(self, task_id: str) -> str:
32        return f"task:{self.queue_name}:{task_id}"
33
34    async def enqueue(
35        self,
36        payload: T,
37        priority: int = 0,
38        delay_seconds: float = 0
39    ) -> str:
40        pool = await self._get_pool()
41        task_id = str(uuid.uuid4())
42
43        task = QueueTask(
44            task_id=task_id,
45            payload=payload,
46            priority=priority
47        )
48
49        # Store task data
50        await pool.hset(
51            self._task_key(task_id),
52            mapping={
53                "data": json.dumps(task.to_dict()),
54                "payload": json.dumps(payload),
55                "status": TaskStatus.PENDING.value
56            }
57        )
58
59        # Add to sorted set (score = priority, then time)
60        score = -priority * 1e12 + time.time() + delay_seconds
61        await pool.zadd(
62            f"queue:{self.queue_name}:pending",
63            {task_id: score}
64        )
65
66        return task_id
67
68    async def dequeue(
69        self,
70        timeout_seconds: float = 30.0
71    ) -> Optional[QueueTask[T]]:
72        pool = await self._get_pool()
73        deadline = time.time() + timeout_seconds
74
75        while time.time() < deadline:
76            # Atomically move task from pending to processing
77            # using Lua script for atomicity
78            script = """
79            local task_id = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, 1)
80            if #task_id == 0 then
81                return nil
82            end
83            task_id = task_id[1]
84            redis.call('ZREM', KEYS[1], task_id)
85            redis.call('ZADD', KEYS[2], ARGV[2], task_id)
86            redis.call('HSET', KEYS[3] .. task_id, 'status', 'processing', 'started_at', ARGV[3])
87            return task_id
88            """
89
90            now = time.time()
91            task_id = await pool.eval(
92                script,
93                3,
94                f"queue:{self.queue_name}:pending",
95                f"queue:{self.queue_name}:processing",
96                "task:" + self.queue_name + ":",
97                str(now),  # Max score to dequeue
98                str(now + self.visibility_timeout),  # Processing deadline
99                str(now)  # Started at
100            )
101
102            if task_id:
103                task_id = task_id.decode() if isinstance(task_id, bytes) else task_id
104                task_data = await pool.hget(self._task_key(task_id), "data")
105                if task_data:
106                    data = json.loads(task_data)
107                    return QueueTask(
108                        task_id=task_id,
109                        payload=json.loads(
110                            await pool.hget(self._task_key(task_id), "payload")
111                        ),
112                        status=TaskStatus.PROCESSING,
113                        priority=data.get("priority", 0),
114                        retry_count=data.get("retry_count", 0),
115                        max_retries=data.get("max_retries", 3)
116                    )
117
118            await asyncio.sleep(0.1)
119
120        return None
121
122    async def complete(self, task_id: str, result: Any = None) -> None:
123        pool = await self._get_pool()
124
125        # Remove from processing, update status
126        await pool.zrem(f"queue:{self.queue_name}:processing", task_id)
127        await pool.hset(
128            self._task_key(task_id),
129            mapping={
130                "status": TaskStatus.COMPLETED.value,
131                "completed_at": str(time.time()),
132                "result": json.dumps(result) if result else ""
133            }
134        )
135
136        # Set expiry for completed tasks
137        await pool.expire(self._task_key(task_id), 86400)  # 24 hours
138
139    async def fail(
140        self,
141        task_id: str,
142        error: str,
143        retry: bool = True
144    ) -> None:
145        pool = await self._get_pool()
146
147        # Get current task data
148        task_data = await pool.hget(self._task_key(task_id), "data")
149        if not task_data:
150            return
151
152        data = json.loads(task_data)
153        retry_count = data.get("retry_count", 0)
154        max_retries = data.get("max_retries", 3)
155
156        await pool.zrem(f"queue:{self.queue_name}:processing", task_id)
157
158        if retry and retry_count < max_retries:
159            # Re-queue with backoff
160            new_retry_count = retry_count + 1
161            delay = 2 ** new_retry_count
162
163            data["retry_count"] = new_retry_count
164            await pool.hset(
165                self._task_key(task_id),
166                mapping={
167                    "data": json.dumps(data),
168                    "status": TaskStatus.RETRYING.value,
169                    "error": error
170                }
171            )
172
173            score = time.time() + delay
174            await pool.zadd(
175                f"queue:{self.queue_name}:pending",
176                {task_id: score}
177            )
178        else:
179            # Move to dead letter queue
180            await pool.hset(
181                self._task_key(task_id),
182                mapping={
183                    "status": TaskStatus.DEAD_LETTER.value,
184                    "error": error,
185                    "completed_at": str(time.time())
186                }
187            )
188            await pool.zadd(
189                f"queue:{self.queue_name}:dead_letter",
190                {task_id: time.time()}
191            )
192
193    async def get_status(self, task_id: str) -> Optional[QueueTask[T]]:
194        pool = await self._get_pool()
195        data = await pool.hgetall(self._task_key(task_id))
196
197        if not data:
198            return None
199
200        task_data = json.loads(data.get(b"data", b"{}").decode())
201        return QueueTask(
202            task_id=task_id,
203            payload=json.loads(data.get(b"payload", b"null").decode()),
204            status=TaskStatus(data.get(b"status", b"pending").decode()),
205            priority=task_data.get("priority", 0),
206            retry_count=task_data.get("retry_count", 0),
207            error=data.get(b"error", b"").decode() or None
208        )
209
210    async def recover_stale_tasks(self) -> int:
211        """Recover tasks that exceeded visibility timeout."""
212        pool = await self._get_pool()
213
214        # Find expired processing tasks
215        now = time.time()
216        stale_tasks = await pool.zrangebyscore(
217            f"queue:{self.queue_name}:processing",
218            "-inf",
219            str(now)
220        )
221
222        recovered = 0
223        for task_id in stale_tasks:
224            task_id = task_id.decode() if isinstance(task_id, bytes) else task_id
225
226            # Move back to pending
227            await pool.zrem(f"queue:{self.queue_name}:processing", task_id)
228            await pool.zadd(
229                f"queue:{self.queue_name}:pending",
230                {task_id: now}
231            )
232            await pool.hset(
233                self._task_key(task_id),
234                "status",
235                TaskStatus.PENDING.value
236            )
237            recovered += 1
238
239        return recovered

The Redis queue uses Lua scripts for atomic operations, ensuring tasks are not lost or duplicated. Visibility timeout and recovery mechanisms handle worker failures gracefully.


Worker Patterns

Workers consume tasks from queues and execute agent logic. Various patterns exist for worker design, from simple single-threaded workers to complex multi-process pools with graceful shutdown.

Agent Task Worker

🐍python
1from dataclasses import dataclass
2from typing import Any, Optional, Callable, Awaitable
3import asyncio
4import signal
5import logging
6
7
8@dataclass
9class WorkerConfig:
10    """Configuration for task workers."""
11    worker_id: str
12    concurrency: int = 5
13    poll_interval: float = 1.0
14    shutdown_timeout: float = 30.0
15    heartbeat_interval: float = 10.0
16
17
18class TaskWorker:
19    """Worker that processes tasks from a queue."""
20
21    def __init__(
22        self,
23        config: WorkerConfig,
24        queue: TaskQueue,
25        handler: Callable[[Any], Awaitable[Any]]
26    ):
27        self.config = config
28        self.queue = queue
29        self.handler = handler
30
31        self._running = False
32        self._active_tasks: set[asyncio.Task] = set()
33        self._semaphore = asyncio.Semaphore(config.concurrency)
34        self._shutdown_event = asyncio.Event()
35
36        self.logger = logging.getLogger(f"worker.{config.worker_id}")
37
38    async def start(self) -> None:
39        """Start the worker."""
40        self._running = True
41        self.logger.info(f"Worker {self.config.worker_id} starting")
42
43        # Set up signal handlers
44        loop = asyncio.get_running_loop()
45        for sig in (signal.SIGTERM, signal.SIGINT):
46            loop.add_signal_handler(sig, self._signal_handler)
47
48        # Start heartbeat task
49        heartbeat_task = asyncio.create_task(self._heartbeat_loop())
50
51        try:
52            await self._main_loop()
53        finally:
54            heartbeat_task.cancel()
55            await self._graceful_shutdown()
56
57    def _signal_handler(self) -> None:
58        """Handle shutdown signal."""
59        self.logger.info("Received shutdown signal")
60        self._running = False
61        self._shutdown_event.set()
62
63    async def _main_loop(self) -> None:
64        """Main processing loop."""
65        while self._running:
66            try:
67                # Wait for available slot
68                await self._semaphore.acquire()
69
70                # Poll for task
71                task = await self.queue.dequeue(
72                    timeout_seconds=self.config.poll_interval
73                )
74
75                if task is None:
76                    self._semaphore.release()
77                    continue
78
79                # Process task asynchronously
80                process_task = asyncio.create_task(
81                    self._process_task(task)
82                )
83                self._active_tasks.add(process_task)
84                process_task.add_done_callback(
85                    lambda t: self._task_done(t)
86                )
87
88            except Exception as e:
89                self.logger.error(f"Error in main loop: {e}")
90                self._semaphore.release()
91                await asyncio.sleep(1)
92
93    def _task_done(self, task: asyncio.Task) -> None:
94        """Callback when a task completes."""
95        self._active_tasks.discard(task)
96        self._semaphore.release()
97
98    async def _process_task(self, task: QueueTask) -> None:
99        """Process a single task."""
100        self.logger.info(f"Processing task {task.task_id}")
101
102        try:
103            result = await asyncio.wait_for(
104                self.handler(task.payload),
105                timeout=task.timeout_seconds
106            )
107            await self.queue.complete(task.task_id, result)
108            self.logger.info(f"Task {task.task_id} completed")
109
110        except asyncio.TimeoutError:
111            await self.queue.fail(
112                task.task_id,
113                "Task timed out",
114                retry=True
115            )
116            self.logger.warning(f"Task {task.task_id} timed out")
117
118        except Exception as e:
119            await self.queue.fail(
120                task.task_id,
121                str(e),
122                retry=True
123            )
124            self.logger.error(f"Task {task.task_id} failed: {e}")
125
126    async def _heartbeat_loop(self) -> None:
127        """Send periodic heartbeats."""
128        while self._running:
129            self.logger.debug(
130                f"Heartbeat: {len(self._active_tasks)} active tasks"
131            )
132            await asyncio.sleep(self.config.heartbeat_interval)
133
134    async def _graceful_shutdown(self) -> None:
135        """Gracefully shutdown the worker."""
136        self.logger.info("Starting graceful shutdown")
137
138        # Wait for active tasks to complete
139        if self._active_tasks:
140            self.logger.info(
141                f"Waiting for {len(self._active_tasks)} tasks"
142            )
143            done, pending = await asyncio.wait(
144                self._active_tasks,
145                timeout=self.config.shutdown_timeout
146            )
147
148            if pending:
149                self.logger.warning(
150                    f"Force cancelling {len(pending)} tasks"
151                )
152                for task in pending:
153                    task.cancel()
154
155        self.logger.info("Shutdown complete")
156
157
158class WorkerPool:
159    """Pool of workers for parallel processing."""
160
161    def __init__(
162        self,
163        num_workers: int,
164        queue: TaskQueue,
165        handler: Callable[[Any], Awaitable[Any]]
166    ):
167        self.num_workers = num_workers
168        self.queue = queue
169        self.handler = handler
170        self._workers: list[TaskWorker] = []
171        self._tasks: list[asyncio.Task] = []
172
173    async def start(self) -> None:
174        """Start all workers."""
175        for i in range(self.num_workers):
176            config = WorkerConfig(
177                worker_id=f"worker-{i}",
178                concurrency=5
179            )
180            worker = TaskWorker(config, self.queue, self.handler)
181            self._workers.append(worker)
182            task = asyncio.create_task(worker.start())
183            self._tasks.append(task)
184
185    async def stop(self) -> None:
186        """Stop all workers."""
187        for worker in self._workers:
188            worker._running = False
189            worker._shutdown_event.set()
190
191        await asyncio.gather(*self._tasks, return_exceptions=True)
192
193    async def scale(self, new_count: int) -> None:
194        """Scale the worker pool."""
195        current = len(self._workers)
196
197        if new_count > current:
198            # Add workers
199            for i in range(current, new_count):
200                config = WorkerConfig(
201                    worker_id=f"worker-{i}",
202                    concurrency=5
203                )
204                worker = TaskWorker(config, self.queue, self.handler)
205                self._workers.append(worker)
206                task = asyncio.create_task(worker.start())
207                self._tasks.append(task)
208
209        elif new_count < current:
210            # Remove workers
211            for _ in range(current - new_count):
212                worker = self._workers.pop()
213                task = self._tasks.pop()
214                worker._running = False
215                worker._shutdown_event.set()
216                await task

The worker pattern supports concurrent task processing, graceful shutdown, and dynamic scaling. Signal handlers ensure clean termination even when receiving system signals.


Backpressure Management

Backpressure occurs when tasks arrive faster than they can be processed. Proper backpressure management prevents system overload and maintains service quality under high load.

Backpressure Controller

🐍python
1from dataclasses import dataclass
2from typing import Optional
3from enum import Enum
4import asyncio
5import time
6
7
8class BackpressureAction(Enum):
9    """Actions to take under backpressure."""
10    ACCEPT = "accept"
11    DELAY = "delay"
12    REJECT = "reject"
13    SHED_LOAD = "shed_load"
14
15
16@dataclass
17class BackpressureConfig:
18    """Configuration for backpressure management."""
19    # Queue depth thresholds
20    queue_low_water: int = 100
21    queue_high_water: int = 500
22    queue_critical: int = 1000
23
24    # Processing rate thresholds
25    min_processing_rate: float = 10.0  # tasks/second
26
27    # Response actions
28    delay_ms_per_task: float = 10.0
29    max_delay_ms: float = 5000.0
30
31    # Load shedding
32    shed_priority_below: int = 5
33
34
35class BackpressureController:
36    """Controls backpressure for the task queue."""
37
38    def __init__(self, config: BackpressureConfig):
39        self.config = config
40        self._queue_depth = 0
41        self._processing_rate = 0.0
42        self._processed_count = 0
43        self._last_rate_calc = time.time()
44        self._lock = asyncio.Lock()
45
46    async def update_metrics(
47        self,
48        queue_depth: int,
49        processed: int = 0
50    ) -> None:
51        """Update backpressure metrics."""
52        async with self._lock:
53            self._queue_depth = queue_depth
54            self._processed_count += processed
55
56            # Calculate processing rate
57            now = time.time()
58            elapsed = now - self._last_rate_calc
59            if elapsed >= 1.0:
60                self._processing_rate = self._processed_count / elapsed
61                self._processed_count = 0
62                self._last_rate_calc = now
63
64    async def evaluate(
65        self,
66        priority: int = 0
67    ) -> tuple[BackpressureAction, Optional[float]]:
68        """Evaluate what action to take for a new task."""
69        async with self._lock:
70            queue_depth = self._queue_depth
71            rate = self._processing_rate
72
73        # Critical - reject or shed load
74        if queue_depth >= self.config.queue_critical:
75            if priority < self.config.shed_priority_below:
76                return BackpressureAction.SHED_LOAD, None
77            return BackpressureAction.REJECT, None
78
79        # High water - delay or shed low priority
80        if queue_depth >= self.config.queue_high_water:
81            if priority < self.config.shed_priority_below:
82                return BackpressureAction.SHED_LOAD, None
83
84            # Calculate delay
85            excess = queue_depth - self.config.queue_high_water
86            delay = min(
87                excess * self.config.delay_ms_per_task,
88                self.config.max_delay_ms
89            )
90            return BackpressureAction.DELAY, delay
91
92        # Low processing rate - delay
93        if rate < self.config.min_processing_rate and queue_depth > 0:
94            delay = min(
95                (self.config.min_processing_rate / max(rate, 0.1)) * 100,
96                self.config.max_delay_ms
97            )
98            return BackpressureAction.DELAY, delay
99
100        return BackpressureAction.ACCEPT, None
101
102
103class AdaptiveRateLimiter:
104    """Adapts rate limiting based on system state."""
105
106    def __init__(
107        self,
108        base_rate: float,
109        min_rate: float,
110        max_rate: float
111    ):
112        self.base_rate = base_rate
113        self.min_rate = min_rate
114        self.max_rate = max_rate
115        self._current_rate = base_rate
116        self._window_start = time.time()
117        self._requests_in_window = 0
118        self._lock = asyncio.Lock()
119
120    async def acquire(self) -> bool:
121        """Attempt to acquire a rate limit token."""
122        async with self._lock:
123            now = time.time()
124            window_elapsed = now - self._window_start
125
126            # Reset window every second
127            if window_elapsed >= 1.0:
128                self._window_start = now
129                self._requests_in_window = 0
130
131            if self._requests_in_window < self._current_rate:
132                self._requests_in_window += 1
133                return True
134
135            return False
136
137    async def adjust_rate(
138        self,
139        success_rate: float,
140        latency_ms: float,
141        target_latency_ms: float = 1000.0
142    ) -> None:
143        """Adjust rate based on system performance."""
144        async with self._lock:
145            # Decrease rate if latency is high or success rate is low
146            if latency_ms > target_latency_ms * 1.5 or success_rate < 0.95:
147                self._current_rate = max(
148                    self.min_rate,
149                    self._current_rate * 0.9
150                )
151            # Increase rate if performing well
152            elif latency_ms < target_latency_ms * 0.5 and success_rate > 0.99:
153                self._current_rate = min(
154                    self.max_rate,
155                    self._current_rate * 1.1
156                )
157
158    @property
159    def current_rate(self) -> float:
160        return self._current_rate
161
162
163class LoadShedder:
164    """Sheds load when system is overwhelmed."""
165
166    def __init__(
167        self,
168        shed_threshold: float = 0.9,
169        recovery_threshold: float = 0.7
170    ):
171        self.shed_threshold = shed_threshold
172        self.recovery_threshold = recovery_threshold
173        self._shedding = False
174        self._shed_count = 0
175
176    def should_shed(
177        self,
178        utilization: float,
179        priority: int,
180        priority_threshold: int = 5
181    ) -> bool:
182        """Determine if load should be shed."""
183        # Enter shedding mode
184        if utilization >= self.shed_threshold:
185            self._shedding = True
186
187        # Exit shedding mode
188        if utilization < self.recovery_threshold:
189            self._shedding = False
190
191        # Shed low priority requests when in shedding mode
192        if self._shedding and priority < priority_threshold:
193            self._shed_count += 1
194            return True
195
196        return False
197
198    @property
199    def stats(self) -> dict[str, Any]:
200        return {
201            "shedding": self._shedding,
202            "shed_count": self._shed_count
203        }

Backpressure management combines queue monitoring, adaptive rate limiting, and load shedding to maintain system stability. Low-priority requests are shed first to preserve capacity for important work.


Priority and Scheduling

Not all tasks are equal. Priority scheduling ensures important tasks are processed first, while fair scheduling prevents starvation of lower-priority work.

Priority Scheduler

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Dict, List
3from enum import IntEnum
4import asyncio
5import time
6from collections import defaultdict
7
8
9class TaskPriority(IntEnum):
10    """Task priority levels."""
11    CRITICAL = 100
12    HIGH = 75
13    NORMAL = 50
14    LOW = 25
15    BACKGROUND = 0
16
17
18@dataclass
19class PriorityBucket:
20    """Bucket for tasks of a specific priority."""
21    priority: int
22    queue: asyncio.Queue = field(default_factory=asyncio.Queue)
23    weight: float = 1.0
24    processed: int = 0
25    last_served: float = field(default_factory=time.time)
26
27
28class WeightedFairScheduler:
29    """Weighted fair scheduler for task priorities."""
30
31    def __init__(
32        self,
33        weights: Optional[Dict[int, float]] = None
34    ):
35        self._buckets: Dict[int, PriorityBucket] = {}
36
37        # Default weights based on priority
38        default_weights = {
39            TaskPriority.CRITICAL: 10.0,
40            TaskPriority.HIGH: 5.0,
41            TaskPriority.NORMAL: 2.0,
42            TaskPriority.LOW: 1.0,
43            TaskPriority.BACKGROUND: 0.5
44        }
45        weights = weights or default_weights
46
47        for priority, weight in weights.items():
48            self._buckets[priority] = PriorityBucket(
49                priority=priority,
50                weight=weight
51            )
52
53    async def enqueue(
54        self,
55        task: QueueTask,
56        priority: int = TaskPriority.NORMAL
57    ) -> None:
58        """Enqueue a task at a given priority."""
59        bucket = self._buckets.get(priority)
60        if not bucket:
61            # Create bucket for unknown priority
62            bucket = PriorityBucket(priority=priority, weight=1.0)
63            self._buckets[priority] = bucket
64
65        await bucket.queue.put(task)
66
67    async def dequeue(
68        self,
69        timeout_seconds: float = 30.0
70    ) -> Optional[QueueTask]:
71        """Dequeue next task using weighted fair scheduling."""
72        deadline = time.time() + timeout_seconds
73
74        while time.time() < deadline:
75            # Calculate virtual time for each bucket
76            # Lower virtual time = should be served next
77            candidates = []
78            for bucket in self._buckets.values():
79                if not bucket.queue.empty():
80                    virtual_time = bucket.processed / bucket.weight
81                    candidates.append((virtual_time, bucket))
82
83            if not candidates:
84                await asyncio.sleep(0.1)
85                continue
86
87            # Select bucket with lowest virtual time
88            candidates.sort(key=lambda x: x[0])
89            _, selected_bucket = candidates[0]
90
91            try:
92                task = selected_bucket.queue.get_nowait()
93                selected_bucket.processed += 1
94                selected_bucket.last_served = time.time()
95                return task
96            except asyncio.QueueEmpty:
97                continue
98
99        return None
100
101    def get_stats(self) -> Dict[int, Dict[str, Any]]:
102        """Get scheduler statistics."""
103        return {
104            priority: {
105                "queued": bucket.queue.qsize(),
106                "processed": bucket.processed,
107                "weight": bucket.weight
108            }
109            for priority, bucket in self._buckets.items()
110        }
111
112
113class DeadlineScheduler:
114    """Scheduler that considers task deadlines."""
115
116    def __init__(self):
117        self._tasks: List[tuple[float, QueueTask]] = []  # (deadline, task)
118        self._lock = asyncio.Lock()
119
120    async def enqueue(
121        self,
122        task: QueueTask,
123        deadline: Optional[float] = None
124    ) -> None:
125        """Enqueue task with deadline."""
126        if deadline is None:
127            deadline = time.time() + task.timeout_seconds
128
129        async with self._lock:
130            self._tasks.append((deadline, task))
131            self._tasks.sort(key=lambda x: x[0])  # Sort by deadline
132
133    async def dequeue(self) -> Optional[QueueTask]:
134        """Dequeue task with earliest deadline."""
135        async with self._lock:
136            now = time.time()
137
138            # Find task with earliest deadline that hasnt expired
139            for i, (deadline, task) in enumerate(self._tasks):
140                if deadline >= now:
141                    self._tasks.pop(i)
142                    return task
143                else:
144                    # Task expired - move to dead letter
145                    self._tasks.pop(i)
146                    task.status = TaskStatus.DEAD_LETTER
147                    task.error = "Deadline exceeded before processing"
148                    # Would normally move to dead letter queue
149
150            return None
151
152    async def get_expired_count(self) -> int:
153        """Get count of expired tasks."""
154        async with self._lock:
155            now = time.time()
156            return sum(1 for deadline, _ in self._tasks if deadline < now)
157
158
159class MultiTenantScheduler:
160    """Fair scheduler across multiple tenants."""
161
162    def __init__(
163        self,
164        default_quota: int = 100
165    ):
166        self.default_quota = default_quota
167        self._tenant_queues: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
168        self._tenant_quotas: Dict[str, int] = {}
169        self._tenant_usage: Dict[str, int] = defaultdict(int)
170        self._last_reset = time.time()
171        self._lock = asyncio.Lock()
172
173    async def set_quota(self, tenant_id: str, quota: int) -> None:
174        """Set quota for a tenant."""
175        self._tenant_quotas[tenant_id] = quota
176
177    async def enqueue(
178        self,
179        task: QueueTask,
180        tenant_id: str
181    ) -> bool:
182        """Enqueue task for a tenant."""
183        async with self._lock:
184            # Reset usage periodically
185            now = time.time()
186            if now - self._last_reset > 60:
187                self._tenant_usage.clear()
188                self._last_reset = now
189
190            # Check quota
191            quota = self._tenant_quotas.get(tenant_id, self.default_quota)
192            if self._tenant_usage[tenant_id] >= quota:
193                return False
194
195            self._tenant_usage[tenant_id] += 1
196            await self._tenant_queues[tenant_id].put(task)
197            return True
198
199    async def dequeue(self) -> Optional[tuple[str, QueueTask]]:
200        """Dequeue task with round-robin across tenants."""
201        async with self._lock:
202            tenants = list(self._tenant_queues.keys())
203
204            for tenant_id in tenants:
205                queue = self._tenant_queues[tenant_id]
206                if not queue.empty():
207                    task = await queue.get()
208                    # Rotate to end of list for fairness
209                    tenants.remove(tenant_id)
210                    tenants.append(tenant_id)
211                    return tenant_id, task
212
213            return None

The weighted fair scheduler balances priority with fairness, ensuring low-priority tasks eventually get processed. Deadline scheduling adds time-sensitivity, while multi-tenant scheduling ensures fair resource allocation across users.


Summary

Queue-based processing is essential for production agent systems. It provides natural buffering, enables reliable retry handling, and allows independent scaling of producers and consumers.

Key Takeaways

  • Decoupled Architecture - Separate task submission from execution for better resilience and scalability
  • Reliability Features - Use visibility timeouts, dead letter queues, and retry mechanisms
  • Graceful Workers - Implement signal handling and graceful shutdown to prevent task loss
  • Backpressure Management - Combine rate limiting, load shedding, and delays to maintain stability
  • Fair Scheduling - Balance priority with fairness using weighted scheduling algorithms
Next Steps: The next section covers error recovery and resilience patterns for handling failures gracefully in production agent systems.