Introduction
Building a comprehensive evaluation pipeline brings together all the concepts we've covered in this chapter: metrics collection, benchmark design, testing frameworks, A/B testing, and continuous evaluation. This capstone section guides you through creating a production-ready evaluation system that can assess agent performance at scale, providing actionable insights for continuous improvement.
Learning Objectives: By the end of this section, you will be able to design and implement an end-to-end evaluation pipeline, integrate multiple evaluation strategies into a unified system, build scalable data processing for evaluation results, and deploy a production-ready evaluation infrastructure.
A well-designed evaluation pipeline serves as the foundation for data-driven agent development. It provides consistent, reproducible assessments that inform architectural decisions, guide optimization efforts, and ensure quality standards are maintained as systems evolve.
Pipeline Architecture
The evaluation pipeline follows a modular architecture where each component handles a specific responsibility. This design enables independent scaling, easy testing, and flexible deployment across different environments.
Core Architecture Components
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, Optional, Generic, TypeVar
4from enum import Enum
5import asyncio
6from datetime import datetime
7import uuid
8
9T = TypeVar('T')
10
11class PipelineStage(Enum):
12 """Pipeline processing stages."""
13 INGESTION = "ingestion"
14 PREPROCESSING = "preprocessing"
15 EVALUATION = "evaluation"
16 AGGREGATION = "aggregation"
17 REPORTING = "reporting"
18 ARCHIVAL = "archival"
19
20
21@dataclass
22class PipelineContext:
23 """Context passed through pipeline stages."""
24 run_id: str = field(default_factory=lambda: str(uuid.uuid4()))
25 started_at: datetime = field(default_factory=datetime.utcnow)
26 metadata: dict[str, Any] = field(default_factory=dict)
27 stage_results: dict[str, Any] = field(default_factory=dict)
28 errors: list[dict[str, Any]] = field(default_factory=list)
29
30 def record_stage(
31 self,
32 stage: PipelineStage,
33 result: Any,
34 duration_ms: float
35 ) -> None:
36 """Record results from a pipeline stage."""
37 self.stage_results[stage.value] = {
38 "result": result,
39 "duration_ms": duration_ms,
40 "completed_at": datetime.utcnow().isoformat()
41 }
42
43 def record_error(
44 self,
45 stage: PipelineStage,
46 error: Exception,
47 recoverable: bool = True
48 ) -> None:
49 """Record an error during pipeline execution."""
50 self.errors.append({
51 "stage": stage.value,
52 "error_type": type(error).__name__,
53 "message": str(error),
54 "recoverable": recoverable,
55 "timestamp": datetime.utcnow().isoformat()
56 })
57
58
59class PipelineComponent(ABC, Generic[T]):
60 """Abstract base for pipeline components."""
61
62 def __init__(self, name: str):
63 self.name = name
64 self.stage: PipelineStage = PipelineStage.EVALUATION
65
66 @abstractmethod
67 async def process(
68 self,
69 input_data: Any,
70 context: PipelineContext
71 ) -> T:
72 """Process input and return result."""
73 pass
74
75 async def execute(
76 self,
77 input_data: Any,
78 context: PipelineContext
79 ) -> T:
80 """Execute with timing and error handling."""
81 import time
82 start = time.perf_counter()
83
84 try:
85 result = await self.process(input_data, context)
86 duration_ms = (time.perf_counter() - start) * 1000
87 context.record_stage(self.stage, result, duration_ms)
88 return result
89
90 except Exception as e:
91 context.record_error(self.stage, e)
92 raise
93
94
95@dataclass
96class PipelineConfig:
97 """Configuration for the evaluation pipeline."""
98 # Data ingestion settings
99 batch_size: int = 100
100 max_concurrent_evaluations: int = 10
101
102 # Evaluation settings
103 timeout_seconds: float = 300.0
104 retry_count: int = 3
105 retry_delay_seconds: float = 5.0
106
107 # Storage settings
108 results_bucket: str = "evaluation-results"
109 archive_after_days: int = 90
110
111 # Reporting settings
112 report_formats: list[str] = field(
113 default_factory=lambda: ["json", "html"]
114 )
115 notification_channels: list[str] = field(
116 default_factory=lambda: ["slack", "email"]
117 )
118
119 # Feature flags
120 enable_caching: bool = True
121 enable_parallel_evaluation: bool = True
122 enable_detailed_logging: bool = False
123
124
125class EvaluationPipeline:
126 """Main evaluation pipeline orchestrator."""
127
128 def __init__(self, config: PipelineConfig):
129 self.config = config
130 self.components: dict[PipelineStage, PipelineComponent] = {}
131 self.hooks: dict[str, list[callable]] = {
132 "pre_stage": [],
133 "post_stage": [],
134 "on_error": [],
135 "on_complete": []
136 }
137
138 def register_component(
139 self,
140 stage: PipelineStage,
141 component: PipelineComponent
142 ) -> None:
143 """Register a component for a pipeline stage."""
144 component.stage = stage
145 self.components[stage] = component
146
147 def add_hook(
148 self,
149 hook_type: str,
150 callback: callable
151 ) -> None:
152 """Add a hook callback."""
153 if hook_type in self.hooks:
154 self.hooks[hook_type].append(callback)
155
156 async def _run_hooks(
157 self,
158 hook_type: str,
159 *args,
160 **kwargs
161 ) -> None:
162 """Run all hooks of a given type."""
163 for callback in self.hooks[hook_type]:
164 if asyncio.iscoroutinefunction(callback):
165 await callback(*args, **kwargs)
166 else:
167 callback(*args, **kwargs)
168
169 async def run(
170 self,
171 input_data: Any,
172 context: Optional[PipelineContext] = None
173 ) -> PipelineContext:
174 """Execute the full pipeline."""
175 context = context or PipelineContext()
176 current_data = input_data
177
178 # Define stage order
179 stage_order = [
180 PipelineStage.INGESTION,
181 PipelineStage.PREPROCESSING,
182 PipelineStage.EVALUATION,
183 PipelineStage.AGGREGATION,
184 PipelineStage.REPORTING,
185 PipelineStage.ARCHIVAL
186 ]
187
188 try:
189 for stage in stage_order:
190 if stage not in self.components:
191 continue
192
193 component = self.components[stage]
194 await self._run_hooks("pre_stage", stage, context)
195
196 try:
197 current_data = await component.execute(
198 current_data,
199 context
200 )
201 await self._run_hooks("post_stage", stage, context)
202
203 except Exception as e:
204 await self._run_hooks("on_error", stage, e, context)
205
206 # Check if error is recoverable
207 if not any(
208 err["stage"] == stage.value and
209 not err["recoverable"]
210 for err in context.errors
211 ):
212 continue
213 raise
214
215 await self._run_hooks("on_complete", context)
216
217 except Exception as e:
218 context.metadata["failed"] = True
219 context.metadata["failure_reason"] = str(e)
220
221 return contextThe pipeline architecture separates concerns into distinct stages, each handled by a specialized component. The context object flows through all stages, accumulating results and tracking errors for comprehensive observability.
| Stage | Responsibility | Input | Output |
|---|---|---|---|
| Ingestion | Load evaluation data | Data sources | Raw evaluation tasks |
| Preprocessing | Clean and validate | Raw tasks | Validated tasks |
| Evaluation | Run assessments | Tasks | Raw results |
| Aggregation | Combine results | Raw results | Aggregated metrics |
| Reporting | Generate reports | Metrics | Reports/alerts |
| Archival | Store for analysis | All data | Archived records |
Data Ingestion Layer
The data ingestion layer handles loading evaluation tasks from various sources including databases, file systems, message queues, and external APIs. It normalizes data into a consistent format for downstream processing.
Multi-Source Data Ingestion
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, AsyncIterator, Optional
4from enum import Enum
5import json
6import asyncio
7import aiohttp
8import aioboto3
9
10
11@dataclass
12class EvaluationTask:
13 """Represents a single evaluation task."""
14 task_id: str
15 agent_id: str
16 input_data: dict[str, Any]
17 expected_output: Optional[dict[str, Any]] = None
18 metadata: dict[str, Any] = field(default_factory=dict)
19 priority: int = 0
20 timeout_seconds: float = 60.0
21
22 def to_dict(self) -> dict[str, Any]:
23 return {
24 "task_id": self.task_id,
25 "agent_id": self.agent_id,
26 "input_data": self.input_data,
27 "expected_output": self.expected_output,
28 "metadata": self.metadata,
29 "priority": self.priority,
30 "timeout_seconds": self.timeout_seconds
31 }
32
33
34class DataSource(ABC):
35 """Abstract data source for evaluation tasks."""
36
37 @abstractmethod
38 async def fetch_tasks(
39 self,
40 batch_size: int = 100
41 ) -> AsyncIterator[list[EvaluationTask]]:
42 """Fetch evaluation tasks in batches."""
43 pass
44
45 @abstractmethod
46 async def get_task_count(self) -> int:
47 """Get total number of available tasks."""
48 pass
49
50
51class DatabaseDataSource(DataSource):
52 """Fetch tasks from a database."""
53
54 def __init__(
55 self,
56 connection_string: str,
57 table_name: str = "evaluation_tasks"
58 ):
59 self.connection_string = connection_string
60 self.table_name = table_name
61 self._pool = None
62
63 async def _get_pool(self):
64 if self._pool is None:
65 import asyncpg
66 self._pool = await asyncpg.create_pool(self.connection_string)
67 return self._pool
68
69 async def fetch_tasks(
70 self,
71 batch_size: int = 100
72 ) -> AsyncIterator[list[EvaluationTask]]:
73 pool = await self._get_pool()
74 offset = 0
75
76 while True:
77 async with pool.acquire() as conn:
78 rows = await conn.fetch(
79 f"""
80 SELECT * FROM {self.table_name}
81 WHERE status = 'pending'
82 ORDER BY priority DESC, created_at ASC
83 LIMIT $1 OFFSET $2
84 """,
85 batch_size,
86 offset
87 )
88
89 if not rows:
90 break
91
92 tasks = [
93 EvaluationTask(
94 task_id=row["id"],
95 agent_id=row["agent_id"],
96 input_data=json.loads(row["input_data"]),
97 expected_output=json.loads(row["expected_output"])
98 if row["expected_output"] else None,
99 metadata=json.loads(row["metadata"] or "{}"),
100 priority=row["priority"],
101 timeout_seconds=row["timeout_seconds"]
102 )
103 for row in rows
104 ]
105
106 yield tasks
107 offset += batch_size
108
109 async def get_task_count(self) -> int:
110 pool = await self._get_pool()
111 async with pool.acquire() as conn:
112 result = await conn.fetchval(
113 f"SELECT COUNT(*) FROM {self.table_name} WHERE status = 'pending'"
114 )
115 return result
116
117
118class S3DataSource(DataSource):
119 """Fetch tasks from S3 bucket."""
120
121 def __init__(
122 self,
123 bucket_name: str,
124 prefix: str = "evaluation-tasks/"
125 ):
126 self.bucket_name = bucket_name
127 self.prefix = prefix
128 self._session = aioboto3.Session()
129
130 async def fetch_tasks(
131 self,
132 batch_size: int = 100
133 ) -> AsyncIterator[list[EvaluationTask]]:
134 async with self._session.client("s3") as s3:
135 paginator = s3.get_paginator("list_objects_v2")
136
137 batch = []
138 async for page in paginator.paginate(
139 Bucket=self.bucket_name,
140 Prefix=self.prefix
141 ):
142 for obj in page.get("Contents", []):
143 # Fetch object content
144 response = await s3.get_object(
145 Bucket=self.bucket_name,
146 Key=obj["Key"]
147 )
148 content = await response["Body"].read()
149 data = json.loads(content)
150
151 task = EvaluationTask(
152 task_id=data["task_id"],
153 agent_id=data["agent_id"],
154 input_data=data["input_data"],
155 expected_output=data.get("expected_output"),
156 metadata=data.get("metadata", {}),
157 priority=data.get("priority", 0),
158 timeout_seconds=data.get("timeout_seconds", 60.0)
159 )
160 batch.append(task)
161
162 if len(batch) >= batch_size:
163 yield batch
164 batch = []
165
166 if batch:
167 yield batch
168
169 async def get_task_count(self) -> int:
170 count = 0
171 async with self._session.client("s3") as s3:
172 paginator = s3.get_paginator("list_objects_v2")
173 async for page in paginator.paginate(
174 Bucket=self.bucket_name,
175 Prefix=self.prefix
176 ):
177 count += len(page.get("Contents", []))
178 return count
179
180
181class APIDataSource(DataSource):
182 """Fetch tasks from an external API."""
183
184 def __init__(
185 self,
186 base_url: str,
187 api_key: str,
188 endpoint: str = "/evaluation/tasks"
189 ):
190 self.base_url = base_url
191 self.api_key = api_key
192 self.endpoint = endpoint
193
194 async def fetch_tasks(
195 self,
196 batch_size: int = 100
197 ) -> AsyncIterator[list[EvaluationTask]]:
198 async with aiohttp.ClientSession() as session:
199 page = 1
200
201 while True:
202 async with session.get(
203 f"{self.base_url}{self.endpoint}",
204 params={"page": page, "limit": batch_size},
205 headers={"Authorization": f"Bearer {self.api_key}"}
206 ) as response:
207 response.raise_for_status()
208 data = await response.json()
209
210 if not data.get("tasks"):
211 break
212
213 tasks = [
214 EvaluationTask(
215 task_id=t["id"],
216 agent_id=t["agent_id"],
217 input_data=t["input"],
218 expected_output=t.get("expected"),
219 metadata=t.get("metadata", {}),
220 priority=t.get("priority", 0),
221 timeout_seconds=t.get("timeout", 60.0)
222 )
223 for t in data["tasks"]
224 ]
225
226 yield tasks
227
228 if not data.get("has_more", False):
229 break
230 page += 1
231
232 async def get_task_count(self) -> int:
233 async with aiohttp.ClientSession() as session:
234 async with session.get(
235 f"{self.base_url}{self.endpoint}/count",
236 headers={"Authorization": f"Bearer {self.api_key}"}
237 ) as response:
238 response.raise_for_status()
239 data = await response.json()
240 return data["count"]
241
242
243class DataIngestionComponent(PipelineComponent[list[EvaluationTask]]):
244 """Pipeline component for data ingestion."""
245
246 def __init__(
247 self,
248 sources: list[DataSource],
249 batch_size: int = 100,
250 deduplicate: bool = True
251 ):
252 super().__init__("data_ingestion")
253 self.sources = sources
254 self.batch_size = batch_size
255 self.deduplicate = deduplicate
256 self.stage = PipelineStage.INGESTION
257
258 async def process(
259 self,
260 input_data: Any,
261 context: PipelineContext
262 ) -> list[EvaluationTask]:
263 """Ingest tasks from all configured sources."""
264 all_tasks = []
265 seen_ids = set()
266
267 for source in self.sources:
268 source_name = type(source).__name__
269 source_tasks = []
270
271 async for batch in source.fetch_tasks(self.batch_size):
272 for task in batch:
273 if self.deduplicate and task.task_id in seen_ids:
274 continue
275
276 seen_ids.add(task.task_id)
277 source_tasks.append(task)
278
279 all_tasks.extend(source_tasks)
280
281 context.metadata[f"ingested_from_{source_name}"] = len(source_tasks)
282
283 context.metadata["total_tasks_ingested"] = len(all_tasks)
284
285 # Sort by priority
286 all_tasks.sort(key=lambda t: t.priority, reverse=True)
287
288 return all_tasksThe data ingestion layer supports multiple sources simultaneously with automatic deduplication. Each source implements an async iterator pattern for efficient memory usage when processing large datasets.
Evaluation Engine
The evaluation engine executes assessments against agents, managing concurrency, retries, and timeout handling. It supports multiple evaluation strategies and can distribute work across multiple workers.
Parallel Evaluation Executor
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3from enum import Enum
4import asyncio
5import time
6from contextlib import asynccontextmanager
7
8
9class EvaluationStatus(Enum):
10 """Status of an evaluation."""
11 PENDING = "pending"
12 RUNNING = "running"
13 COMPLETED = "completed"
14 FAILED = "failed"
15 TIMEOUT = "timeout"
16 SKIPPED = "skipped"
17
18
19@dataclass
20class EvaluationResult:
21 """Result of a single evaluation."""
22 task_id: str
23 agent_id: str
24 status: EvaluationStatus
25 metrics: dict[str, float] = field(default_factory=dict)
26 output: Optional[dict[str, Any]] = None
27 error: Optional[str] = None
28 execution_time_ms: float = 0.0
29 metadata: dict[str, Any] = field(default_factory=dict)
30
31 @property
32 def is_success(self) -> bool:
33 return self.status == EvaluationStatus.COMPLETED
34
35
36class Evaluator(ABC):
37 """Abstract base for evaluators."""
38
39 @abstractmethod
40 async def evaluate(
41 self,
42 task: EvaluationTask,
43 agent_output: dict[str, Any]
44 ) -> dict[str, float]:
45 """Evaluate agent output and return metrics."""
46 pass
47
48
49class CompositeEvaluator(Evaluator):
50 """Combines multiple evaluators."""
51
52 def __init__(self, evaluators: list[Evaluator]):
53 self.evaluators = evaluators
54
55 async def evaluate(
56 self,
57 task: EvaluationTask,
58 agent_output: dict[str, Any]
59 ) -> dict[str, float]:
60 all_metrics = {}
61
62 evaluations = await asyncio.gather(
63 *[e.evaluate(task, agent_output) for e in self.evaluators],
64 return_exceptions=True
65 )
66
67 for eval_result in evaluations:
68 if isinstance(eval_result, Exception):
69 continue
70 all_metrics.update(eval_result)
71
72 return all_metrics
73
74
75class AgentExecutor:
76 """Executes agents for evaluation."""
77
78 def __init__(
79 self,
80 agent_registry: dict[str, Callable],
81 default_timeout: float = 60.0
82 ):
83 self.agent_registry = agent_registry
84 self.default_timeout = default_timeout
85
86 async def execute(
87 self,
88 task: EvaluationTask
89 ) -> dict[str, Any]:
90 """Execute agent and return output."""
91 agent_fn = self.agent_registry.get(task.agent_id)
92
93 if not agent_fn:
94 raise ValueError(f"Unknown agent: {task.agent_id}")
95
96 timeout = task.timeout_seconds or self.default_timeout
97
98 try:
99 result = await asyncio.wait_for(
100 agent_fn(task.input_data),
101 timeout=timeout
102 )
103 return result
104
105 except asyncio.TimeoutError:
106 raise TimeoutError(
107 f"Agent {task.agent_id} timed out after {timeout}s"
108 )
109
110
111class EvaluationWorker:
112 """Worker that processes evaluation tasks."""
113
114 def __init__(
115 self,
116 worker_id: int,
117 agent_executor: AgentExecutor,
118 evaluator: Evaluator,
119 retry_count: int = 3,
120 retry_delay: float = 1.0
121 ):
122 self.worker_id = worker_id
123 self.agent_executor = agent_executor
124 self.evaluator = evaluator
125 self.retry_count = retry_count
126 self.retry_delay = retry_delay
127 self._active = False
128
129 async def process_task(
130 self,
131 task: EvaluationTask
132 ) -> EvaluationResult:
133 """Process a single evaluation task."""
134 start_time = time.perf_counter()
135 last_error = None
136
137 for attempt in range(self.retry_count + 1):
138 try:
139 # Execute agent
140 output = await self.agent_executor.execute(task)
141
142 # Run evaluation
143 metrics = await self.evaluator.evaluate(task, output)
144
145 execution_time = (time.perf_counter() - start_time) * 1000
146
147 return EvaluationResult(
148 task_id=task.task_id,
149 agent_id=task.agent_id,
150 status=EvaluationStatus.COMPLETED,
151 metrics=metrics,
152 output=output,
153 execution_time_ms=execution_time,
154 metadata={
155 "worker_id": self.worker_id,
156 "attempts": attempt + 1
157 }
158 )
159
160 except TimeoutError as e:
161 execution_time = (time.perf_counter() - start_time) * 1000
162 return EvaluationResult(
163 task_id=task.task_id,
164 agent_id=task.agent_id,
165 status=EvaluationStatus.TIMEOUT,
166 error=str(e),
167 execution_time_ms=execution_time,
168 metadata={"worker_id": self.worker_id}
169 )
170
171 except Exception as e:
172 last_error = e
173 if attempt < self.retry_count:
174 await asyncio.sleep(
175 self.retry_delay * (2 ** attempt)
176 )
177
178 execution_time = (time.perf_counter() - start_time) * 1000
179 return EvaluationResult(
180 task_id=task.task_id,
181 agent_id=task.agent_id,
182 status=EvaluationStatus.FAILED,
183 error=str(last_error),
184 execution_time_ms=execution_time,
185 metadata={
186 "worker_id": self.worker_id,
187 "attempts": self.retry_count + 1
188 }
189 )
190
191
192class EvaluationPool:
193 """Pool of evaluation workers."""
194
195 def __init__(
196 self,
197 num_workers: int,
198 agent_executor: AgentExecutor,
199 evaluator: Evaluator,
200 retry_count: int = 3
201 ):
202 self.workers = [
203 EvaluationWorker(
204 worker_id=i,
205 agent_executor=agent_executor,
206 evaluator=evaluator,
207 retry_count=retry_count
208 )
209 for i in range(num_workers)
210 ]
211 self._task_queue: asyncio.Queue = asyncio.Queue()
212 self._result_queue: asyncio.Queue = asyncio.Queue()
213 self._running = False
214
215 async def _worker_loop(self, worker: EvaluationWorker):
216 """Main loop for a worker."""
217 while self._running:
218 try:
219 task = await asyncio.wait_for(
220 self._task_queue.get(),
221 timeout=1.0
222 )
223 result = await worker.process_task(task)
224 await self._result_queue.put(result)
225 self._task_queue.task_done()
226
227 except asyncio.TimeoutError:
228 continue
229
230 @asynccontextmanager
231 async def running(self):
232 """Context manager to start/stop the pool."""
233 self._running = True
234 worker_tasks = [
235 asyncio.create_task(self._worker_loop(w))
236 for w in self.workers
237 ]
238
239 try:
240 yield self
241 finally:
242 self._running = False
243 await asyncio.gather(*worker_tasks, return_exceptions=True)
244
245 async def submit(self, task: EvaluationTask) -> None:
246 """Submit a task for evaluation."""
247 await self._task_queue.put(task)
248
249 async def submit_batch(
250 self,
251 tasks: list[EvaluationTask]
252 ) -> None:
253 """Submit multiple tasks."""
254 for task in tasks:
255 await self._task_queue.put(task)
256
257 async def get_result(self) -> EvaluationResult:
258 """Get next available result."""
259 return await self._result_queue.get()
260
261 async def collect_results(
262 self,
263 count: int,
264 timeout: Optional[float] = None
265 ) -> list[EvaluationResult]:
266 """Collect specified number of results."""
267 results = []
268 deadline = time.time() + timeout if timeout else None
269
270 while len(results) < count:
271 remaining = None
272 if deadline:
273 remaining = deadline - time.time()
274 if remaining <= 0:
275 break
276
277 try:
278 result = await asyncio.wait_for(
279 self._result_queue.get(),
280 timeout=remaining
281 )
282 results.append(result)
283
284 except asyncio.TimeoutError:
285 break
286
287 return results
288
289
290class EvaluationEngineComponent(PipelineComponent[list[EvaluationResult]]):
291 """Pipeline component for running evaluations."""
292
293 def __init__(
294 self,
295 agent_executor: AgentExecutor,
296 evaluator: Evaluator,
297 num_workers: int = 10,
298 retry_count: int = 3
299 ):
300 super().__init__("evaluation_engine")
301 self.pool = EvaluationPool(
302 num_workers=num_workers,
303 agent_executor=agent_executor,
304 evaluator=evaluator,
305 retry_count=retry_count
306 )
307 self.stage = PipelineStage.EVALUATION
308
309 async def process(
310 self,
311 input_data: list[EvaluationTask],
312 context: PipelineContext
313 ) -> list[EvaluationResult]:
314 """Run evaluations on all tasks."""
315 async with self.pool.running():
316 # Submit all tasks
317 await self.pool.submit_batch(input_data)
318
319 # Collect results
320 results = await self.pool.collect_results(
321 count=len(input_data),
322 timeout=context.metadata.get("timeout_seconds", 3600)
323 )
324
325 # Record statistics
326 successful = sum(1 for r in results if r.is_success)
327 failed = sum(1 for r in results if r.status == EvaluationStatus.FAILED)
328 timeouts = sum(1 for r in results if r.status == EvaluationStatus.TIMEOUT)
329
330 context.metadata["evaluation_stats"] = {
331 "total": len(results),
332 "successful": successful,
333 "failed": failed,
334 "timeouts": timeouts,
335 "success_rate": successful / len(results) if results else 0
336 }
337
338 return resultsThe evaluation engine uses a worker pool pattern for parallel execution. Each worker handles task execution, evaluation, and retry logic independently, maximizing throughput while maintaining isolation between evaluations.
Performance Tip: The number of workers should be tuned based on available resources and the nature of your evaluations. CPU-bound evaluations benefit from worker counts matching CPU cores, while I/O-bound evaluations can handle many more concurrent workers.
Results Processing
After evaluations complete, results must be aggregated, analyzed, and transformed into actionable insights. The results processing layer handles statistical aggregation, trend analysis, and anomaly detection.
Results Aggregation and Analysis
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from collections import defaultdict
4import statistics
5import math
6
7
8@dataclass
9class MetricStatistics:
10 """Statistical summary of a metric."""
11 name: str
12 count: int
13 mean: float
14 std_dev: float
15 min_value: float
16 max_value: float
17 median: float
18 p95: float
19 p99: float
20
21 def to_dict(self) -> dict[str, Any]:
22 return {
23 "name": self.name,
24 "count": self.count,
25 "mean": round(self.mean, 4),
26 "std_dev": round(self.std_dev, 4),
27 "min": round(self.min_value, 4),
28 "max": round(self.max_value, 4),
29 "median": round(self.median, 4),
30 "p95": round(self.p95, 4),
31 "p99": round(self.p99, 4)
32 }
33
34
35@dataclass
36class AgentSummary:
37 """Summary of agent performance."""
38 agent_id: str
39 total_evaluations: int
40 successful: int
41 failed: int
42 timeouts: int
43 success_rate: float
44 metrics: dict[str, MetricStatistics]
45 avg_execution_time_ms: float
46
47
48@dataclass
49class AggregatedResults:
50 """Aggregated evaluation results."""
51 run_id: str
52 total_evaluations: int
53 by_agent: dict[str, AgentSummary]
54 overall_metrics: dict[str, MetricStatistics]
55 anomalies: list[dict[str, Any]]
56 comparisons: dict[str, Any]
57
58
59class MetricAggregator:
60 """Aggregates metric values."""
61
62 def __init__(self):
63 self._values: dict[str, list[float]] = defaultdict(list)
64
65 def add(self, metric_name: str, value: float) -> None:
66 """Add a metric value."""
67 self._values[metric_name].append(value)
68
69 def compute_statistics(
70 self,
71 metric_name: str
72 ) -> Optional[MetricStatistics]:
73 """Compute statistics for a metric."""
74 values = self._values.get(metric_name, [])
75
76 if not values:
77 return None
78
79 sorted_values = sorted(values)
80 n = len(sorted_values)
81
82 mean = statistics.mean(values)
83 std_dev = statistics.stdev(values) if n > 1 else 0.0
84
85 # Calculate percentiles
86 p95_idx = int(n * 0.95)
87 p99_idx = int(n * 0.99)
88
89 return MetricStatistics(
90 name=metric_name,
91 count=n,
92 mean=mean,
93 std_dev=std_dev,
94 min_value=sorted_values[0],
95 max_value=sorted_values[-1],
96 median=statistics.median(values),
97 p95=sorted_values[min(p95_idx, n - 1)],
98 p99=sorted_values[min(p99_idx, n - 1)]
99 )
100
101 def get_all_statistics(self) -> dict[str, MetricStatistics]:
102 """Get statistics for all metrics."""
103 return {
104 name: stats
105 for name in self._values
106 if (stats := self.compute_statistics(name)) is not None
107 }
108
109
110class AnomalyDetector:
111 """Detects anomalies in evaluation results."""
112
113 def __init__(
114 self,
115 z_score_threshold: float = 3.0,
116 min_samples: int = 10
117 ):
118 self.z_score_threshold = z_score_threshold
119 self.min_samples = min_samples
120
121 def detect(
122 self,
123 results: list[EvaluationResult],
124 baseline_stats: Optional[dict[str, MetricStatistics]] = None
125 ) -> list[dict[str, Any]]:
126 """Detect anomalies in results."""
127 anomalies = []
128
129 # Group by metric
130 metric_values: dict[str, list[tuple[str, float]]] = defaultdict(list)
131
132 for result in results:
133 if not result.is_success:
134 continue
135
136 for metric_name, value in result.metrics.items():
137 metric_values[metric_name].append(
138 (result.task_id, value)
139 )
140
141 # Detect anomalies per metric
142 for metric_name, values in metric_values.items():
143 if len(values) < self.min_samples:
144 continue
145
146 all_values = [v for _, v in values]
147 mean = statistics.mean(all_values)
148 std = statistics.stdev(all_values)
149
150 if std == 0:
151 continue
152
153 # Use baseline if available
154 if baseline_stats and metric_name in baseline_stats:
155 baseline = baseline_stats[metric_name]
156 mean = baseline.mean
157 std = baseline.std_dev
158
159 # Find outliers using z-score
160 for task_id, value in values:
161 z_score = abs((value - mean) / std)
162
163 if z_score > self.z_score_threshold:
164 anomalies.append({
165 "type": "metric_outlier",
166 "task_id": task_id,
167 "metric": metric_name,
168 "value": value,
169 "z_score": z_score,
170 "expected_mean": mean,
171 "expected_std": std
172 })
173
174 # Detect execution time anomalies
175 exec_times = [
176 (r.task_id, r.execution_time_ms)
177 for r in results
178 if r.execution_time_ms > 0
179 ]
180
181 if len(exec_times) >= self.min_samples:
182 times = [t for _, t in exec_times]
183 mean_time = statistics.mean(times)
184 std_time = statistics.stdev(times)
185
186 if std_time > 0:
187 for task_id, exec_time in exec_times:
188 z_score = (exec_time - mean_time) / std_time
189
190 if z_score > self.z_score_threshold:
191 anomalies.append({
192 "type": "slow_execution",
193 "task_id": task_id,
194 "execution_time_ms": exec_time,
195 "z_score": z_score,
196 "expected_mean_ms": mean_time
197 })
198
199 return anomalies
200
201
202class ResultComparator:
203 """Compares results against baselines."""
204
205 def __init__(
206 self,
207 significance_threshold: float = 0.05,
208 min_improvement: float = 0.01
209 ):
210 self.significance_threshold = significance_threshold
211 self.min_improvement = min_improvement
212
213 def compare(
214 self,
215 current: dict[str, MetricStatistics],
216 baseline: dict[str, MetricStatistics]
217 ) -> dict[str, Any]:
218 """Compare current results against baseline."""
219 comparisons = {}
220
221 for metric_name, current_stats in current.items():
222 if metric_name not in baseline:
223 comparisons[metric_name] = {
224 "status": "new_metric",
225 "current_mean": current_stats.mean
226 }
227 continue
228
229 baseline_stats = baseline[metric_name]
230
231 # Calculate relative change
232 if baseline_stats.mean != 0:
233 relative_change = (
234 (current_stats.mean - baseline_stats.mean) /
235 abs(baseline_stats.mean)
236 )
237 else:
238 relative_change = 0 if current_stats.mean == 0 else float('inf')
239
240 # Perform t-test approximation
241 t_statistic = self._calculate_t_statistic(
242 current_stats,
243 baseline_stats
244 )
245
246 # Determine significance
247 is_significant = abs(t_statistic) > 1.96 # ~95% confidence
248
249 # Determine direction
250 if abs(relative_change) < self.min_improvement:
251 direction = "no_change"
252 elif relative_change > 0:
253 direction = "improvement"
254 else:
255 direction = "regression"
256
257 comparisons[metric_name] = {
258 "baseline_mean": baseline_stats.mean,
259 "current_mean": current_stats.mean,
260 "relative_change": relative_change,
261 "absolute_change": current_stats.mean - baseline_stats.mean,
262 "direction": direction,
263 "is_significant": is_significant,
264 "t_statistic": t_statistic
265 }
266
267 # Check for missing metrics
268 for metric_name in baseline:
269 if metric_name not in current:
270 comparisons[metric_name] = {
271 "status": "missing_metric",
272 "baseline_mean": baseline[metric_name].mean
273 }
274
275 return comparisons
276
277 def _calculate_t_statistic(
278 self,
279 current: MetricStatistics,
280 baseline: MetricStatistics
281 ) -> float:
282 """Calculate approximate t-statistic."""
283 # Pooled standard error approximation
284 se_current = current.std_dev / math.sqrt(current.count)
285 se_baseline = baseline.std_dev / math.sqrt(baseline.count)
286 pooled_se = math.sqrt(se_current**2 + se_baseline**2)
287
288 if pooled_se == 0:
289 return 0.0
290
291 return (current.mean - baseline.mean) / pooled_se
292
293
294class ResultsProcessingComponent(PipelineComponent[AggregatedResults]):
295 """Pipeline component for results processing."""
296
297 def __init__(
298 self,
299 baseline_stats: Optional[dict[str, MetricStatistics]] = None,
300 detect_anomalies: bool = True
301 ):
302 super().__init__("results_processing")
303 self.baseline_stats = baseline_stats
304 self.detect_anomalies = detect_anomalies
305 self.anomaly_detector = AnomalyDetector()
306 self.comparator = ResultComparator()
307 self.stage = PipelineStage.AGGREGATION
308
309 async def process(
310 self,
311 input_data: list[EvaluationResult],
312 context: PipelineContext
313 ) -> AggregatedResults:
314 """Process and aggregate evaluation results."""
315 # Group by agent
316 by_agent: dict[str, list[EvaluationResult]] = defaultdict(list)
317 for result in input_data:
318 by_agent[result.agent_id].append(result)
319
320 # Aggregate per agent
321 agent_summaries = {}
322 overall_aggregator = MetricAggregator()
323
324 for agent_id, results in by_agent.items():
325 agent_aggregator = MetricAggregator()
326 exec_times = []
327
328 successful = 0
329 failed = 0
330 timeouts = 0
331
332 for result in results:
333 if result.status == EvaluationStatus.COMPLETED:
334 successful += 1
335 for metric, value in result.metrics.items():
336 agent_aggregator.add(metric, value)
337 overall_aggregator.add(metric, value)
338 exec_times.append(result.execution_time_ms)
339 elif result.status == EvaluationStatus.FAILED:
340 failed += 1
341 elif result.status == EvaluationStatus.TIMEOUT:
342 timeouts += 1
343
344 agent_summaries[agent_id] = AgentSummary(
345 agent_id=agent_id,
346 total_evaluations=len(results),
347 successful=successful,
348 failed=failed,
349 timeouts=timeouts,
350 success_rate=successful / len(results) if results else 0,
351 metrics=agent_aggregator.get_all_statistics(),
352 avg_execution_time_ms=(
353 statistics.mean(exec_times) if exec_times else 0
354 )
355 )
356
357 # Detect anomalies
358 anomalies = []
359 if self.detect_anomalies:
360 anomalies = self.anomaly_detector.detect(
361 input_data,
362 self.baseline_stats
363 )
364
365 # Compare against baseline
366 comparisons = {}
367 overall_metrics = overall_aggregator.get_all_statistics()
368 if self.baseline_stats:
369 comparisons = self.comparator.compare(
370 overall_metrics,
371 self.baseline_stats
372 )
373
374 return AggregatedResults(
375 run_id=context.run_id,
376 total_evaluations=len(input_data),
377 by_agent=agent_summaries,
378 overall_metrics=overall_metrics,
379 anomalies=anomalies,
380 comparisons=comparisons
381 )The results processing layer transforms raw evaluation results into structured insights. Statistical aggregation, anomaly detection, and baseline comparison provide actionable information for understanding agent performance.
Reporting and Visualization
The reporting layer generates human-readable reports in multiple formats and triggers notifications when issues are detected. Reports can be customized for different audiences and delivery channels.
Multi-Format Report Generation
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import Any, Optional
4from datetime import datetime
5import json
6
7
8class ReportFormat(ABC):
9 """Abstract report format."""
10
11 @abstractmethod
12 def render(
13 self,
14 results: AggregatedResults,
15 context: PipelineContext
16 ) -> str:
17 """Render report in this format."""
18 pass
19
20 @property
21 @abstractmethod
22 def extension(self) -> str:
23 """File extension for this format."""
24 pass
25
26
27class JSONReportFormat(ReportFormat):
28 """JSON report format."""
29
30 def render(
31 self,
32 results: AggregatedResults,
33 context: PipelineContext
34 ) -> str:
35 report_data = {
36 "run_id": results.run_id,
37 "generated_at": datetime.utcnow().isoformat(),
38 "summary": {
39 "total_evaluations": results.total_evaluations,
40 "agents_evaluated": len(results.by_agent),
41 "anomalies_detected": len(results.anomalies)
42 },
43 "agents": {
44 agent_id: {
45 "total": summary.total_evaluations,
46 "successful": summary.successful,
47 "failed": summary.failed,
48 "timeouts": summary.timeouts,
49 "success_rate": summary.success_rate,
50 "avg_execution_time_ms": summary.avg_execution_time_ms,
51 "metrics": {
52 name: stats.to_dict()
53 for name, stats in summary.metrics.items()
54 }
55 }
56 for agent_id, summary in results.by_agent.items()
57 },
58 "overall_metrics": {
59 name: stats.to_dict()
60 for name, stats in results.overall_metrics.items()
61 },
62 "anomalies": results.anomalies,
63 "comparisons": results.comparisons,
64 "pipeline_metadata": {
65 "started_at": context.started_at.isoformat(),
66 "stages_completed": list(context.stage_results.keys()),
67 "errors": context.errors
68 }
69 }
70
71 return json.dumps(report_data, indent=2)
72
73 @property
74 def extension(self) -> str:
75 return "json"
76
77
78class HTMLReportFormat(ReportFormat):
79 """HTML report format."""
80
81 def render(
82 self,
83 results: AggregatedResults,
84 context: PipelineContext
85 ) -> str:
86 html_parts = [
87 "<!DOCTYPE html>",
88 "<html><head>",
89 "<title>Evaluation Report</title>",
90 "<style>",
91 self._get_styles(),
92 "</style>",
93 "</head><body>",
94 f"<h1>Evaluation Report - {results.run_id[:8]}</h1>",
95 f"<p class='timestamp'>Generated: {datetime.utcnow().isoformat()}</p>"
96 ]
97
98 # Summary section
99 html_parts.append("<section class='summary'>")
100 html_parts.append("<h2>Summary</h2>")
101 html_parts.append(f"<p>Total Evaluations: {results.total_evaluations}</p>")
102 html_parts.append(f"<p>Agents Evaluated: {len(results.by_agent)}</p>")
103 html_parts.append(f"<p>Anomalies Detected: {len(results.anomalies)}</p>")
104 html_parts.append("</section>")
105
106 # Agent results
107 html_parts.append("<section class='agents'>")
108 html_parts.append("<h2>Agent Performance</h2>")
109
110 for agent_id, summary in results.by_agent.items():
111 html_parts.append(f"<div class='agent-card'>")
112 html_parts.append(f"<h3>{agent_id}</h3>")
113 html_parts.append("<table>")
114 html_parts.append("<tr><th>Metric</th><th>Value</th></tr>")
115 html_parts.append(
116 f"<tr><td>Success Rate</td>"
117 f"<td>{summary.success_rate:.1%}</td></tr>"
118 )
119 html_parts.append(
120 f"<tr><td>Avg Execution Time</td>"
121 f"<td>{summary.avg_execution_time_ms:.1f}ms</td></tr>"
122 )
123
124 for metric_name, stats in summary.metrics.items():
125 html_parts.append(
126 f"<tr><td>{metric_name}</td>"
127 f"<td>{stats.mean:.4f} (±{stats.std_dev:.4f})</td></tr>"
128 )
129
130 html_parts.append("</table>")
131 html_parts.append("</div>")
132
133 html_parts.append("</section>")
134
135 # Anomalies section
136 if results.anomalies:
137 html_parts.append("<section class='anomalies'>")
138 html_parts.append("<h2>Anomalies Detected</h2>")
139 html_parts.append("<ul>")
140
141 for anomaly in results.anomalies:
142 html_parts.append(
143 f"<li class='anomaly-{anomaly['type']}'>"
144 f"{anomaly['type']}: Task {anomaly['task_id']} - "
145 f"z-score: {anomaly['z_score']:.2f}</li>"
146 )
147
148 html_parts.append("</ul>")
149 html_parts.append("</section>")
150
151 # Comparisons section
152 if results.comparisons:
153 html_parts.append("<section class='comparisons'>")
154 html_parts.append("<h2>Baseline Comparison</h2>")
155 html_parts.append("<table>")
156 html_parts.append(
157 "<tr><th>Metric</th><th>Change</th><th>Status</th></tr>"
158 )
159
160 for metric, comparison in results.comparisons.items():
161 if "relative_change" in comparison:
162 change = f"{comparison['relative_change']:+.1%}"
163 status_class = (
164 "improvement" if comparison["direction"] == "improvement"
165 else "regression" if comparison["direction"] == "regression"
166 else "neutral"
167 )
168 status = comparison["direction"]
169 else:
170 change = "N/A"
171 status = comparison.get("status", "unknown")
172 status_class = "neutral"
173
174 html_parts.append(
175 f"<tr class='{status_class}'>"
176 f"<td>{metric}</td><td>{change}</td><td>{status}</td></tr>"
177 )
178
179 html_parts.append("</table>")
180 html_parts.append("</section>")
181
182 html_parts.append("</body></html>")
183
184 return "
185".join(html_parts)
186
187 def _get_styles(self) -> str:
188 return """
189 body { font-family: system-ui, sans-serif; margin: 40px; }
190 h1 { color: #333; }
191 .timestamp { color: #666; font-size: 0.9em; }
192 section { margin: 20px 0; padding: 20px; border: 1px solid #ddd; }
193 .agent-card { background: #f9f9f9; padding: 15px; margin: 10px 0; }
194 table { width: 100%; border-collapse: collapse; }
195 th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
196 th { background: #f0f0f0; }
197 .improvement { background: #d4edda; }
198 .regression { background: #f8d7da; }
199 .neutral { background: #fff; }
200 .anomaly-metric_outlier { color: #dc3545; }
201 .anomaly-slow_execution { color: #fd7e14; }
202 """
203
204 @property
205 def extension(self) -> str:
206 return "html"
207
208
209class MarkdownReportFormat(ReportFormat):
210 """Markdown report format."""
211
212 def render(
213 self,
214 results: AggregatedResults,
215 context: PipelineContext
216 ) -> str:
217 lines = [
218 f"# Evaluation Report",
219 f"",
220 f"**Run ID:** {results.run_id}",
221 f"**Generated:** {datetime.utcnow().isoformat()}",
222 f"",
223 f"## Summary",
224 f"",
225 f"- Total Evaluations: {results.total_evaluations}",
226 f"- Agents Evaluated: {len(results.by_agent)}",
227 f"- Anomalies Detected: {len(results.anomalies)}",
228 f"",
229 f"## Agent Performance",
230 f""
231 ]
232
233 for agent_id, summary in results.by_agent.items():
234 lines.append(f"### {agent_id}")
235 lines.append(f"")
236 lines.append(f"| Metric | Value |")
237 lines.append(f"|--------|-------|")
238 lines.append(f"| Success Rate | {summary.success_rate:.1%} |")
239 lines.append(
240 f"| Avg Execution Time | {summary.avg_execution_time_ms:.1f}ms |"
241 )
242
243 for metric_name, stats in summary.metrics.items():
244 lines.append(
245 f"| {metric_name} | {stats.mean:.4f} (±{stats.std_dev:.4f}) |"
246 )
247
248 lines.append(f"")
249
250 if results.anomalies:
251 lines.append(f"## Anomalies")
252 lines.append(f"")
253
254 for anomaly in results.anomalies:
255 lines.append(
256 f"- **{anomaly['type']}**: Task {anomaly['task_id']} "
257 f"(z-score: {anomaly['z_score']:.2f})"
258 )
259
260 lines.append(f"")
261
262 return "
263".join(lines)
264
265 @property
266 def extension(self) -> str:
267 return "md"
268
269
270@dataclass
271class NotificationConfig:
272 """Configuration for notifications."""
273 channel: str
274 webhook_url: Optional[str] = None
275 recipients: list[str] = field(default_factory=list)
276 notify_on_anomalies: bool = True
277 notify_on_regressions: bool = True
278 min_regression_threshold: float = 0.1
279
280
281class NotificationSender:
282 """Sends notifications based on results."""
283
284 def __init__(self, configs: list[NotificationConfig]):
285 self.configs = configs
286
287 async def send(
288 self,
289 results: AggregatedResults,
290 context: PipelineContext
291 ) -> list[dict[str, Any]]:
292 """Send notifications based on results."""
293 notifications_sent = []
294
295 for config in self.configs:
296 should_notify = False
297 reasons = []
298
299 # Check for anomalies
300 if config.notify_on_anomalies and results.anomalies:
301 should_notify = True
302 reasons.append(f"{len(results.anomalies)} anomalies detected")
303
304 # Check for regressions
305 if config.notify_on_regressions:
306 regressions = [
307 (metric, comp)
308 for metric, comp in results.comparisons.items()
309 if comp.get("direction") == "regression" and
310 abs(comp.get("relative_change", 0)) >= config.min_regression_threshold
311 ]
312
313 if regressions:
314 should_notify = True
315 reasons.append(f"{len(regressions)} significant regressions")
316
317 if should_notify:
318 notification = await self._send_notification(
319 config,
320 results,
321 reasons
322 )
323 notifications_sent.append(notification)
324
325 return notifications_sent
326
327 async def _send_notification(
328 self,
329 config: NotificationConfig,
330 results: AggregatedResults,
331 reasons: list[str]
332 ) -> dict[str, Any]:
333 """Send a single notification."""
334 message = self._format_message(results, reasons)
335
336 if config.channel == "slack" and config.webhook_url:
337 async with aiohttp.ClientSession() as session:
338 await session.post(
339 config.webhook_url,
340 json={"text": message}
341 )
342
343 return {
344 "channel": config.channel,
345 "reasons": reasons,
346 "sent_at": datetime.utcnow().isoformat()
347 }
348
349 def _format_message(
350 self,
351 results: AggregatedResults,
352 reasons: list[str]
353 ) -> str:
354 """Format notification message."""
355 lines = [
356 f"⚠️ Evaluation Alert - Run {results.run_id[:8]}",
357 "",
358 "Issues detected:",
359 ]
360
361 for reason in reasons:
362 lines.append(f"• {reason}")
363
364 lines.append("")
365 lines.append(
366 f"Total evaluations: {results.total_evaluations}"
367 )
368
369 return "
370".join(lines)
371
372
373class ReportingComponent(PipelineComponent[dict[str, Any]]):
374 """Pipeline component for report generation."""
375
376 def __init__(
377 self,
378 formats: list[ReportFormat],
379 output_dir: str = "./reports",
380 notification_sender: Optional[NotificationSender] = None
381 ):
382 super().__init__("reporting")
383 self.formats = formats
384 self.output_dir = output_dir
385 self.notification_sender = notification_sender
386 self.stage = PipelineStage.REPORTING
387
388 async def process(
389 self,
390 input_data: AggregatedResults,
391 context: PipelineContext
392 ) -> dict[str, Any]:
393 """Generate reports and send notifications."""
394 import os
395 import aiofiles
396
397 os.makedirs(self.output_dir, exist_ok=True)
398
399 reports_generated = []
400
401 for format_handler in self.formats:
402 content = format_handler.render(input_data, context)
403 filename = f"report_{input_data.run_id}.{format_handler.extension}"
404 filepath = os.path.join(self.output_dir, filename)
405
406 async with aiofiles.open(filepath, "w") as f:
407 await f.write(content)
408
409 reports_generated.append({
410 "format": format_handler.extension,
411 "path": filepath,
412 "size_bytes": len(content)
413 })
414
415 # Send notifications
416 notifications = []
417 if self.notification_sender:
418 notifications = await self.notification_sender.send(
419 input_data,
420 context
421 )
422
423 return {
424 "reports": reports_generated,
425 "notifications": notifications
426 }The reporting system supports multiple output formats and integrates with notification channels to alert stakeholders when issues are detected. Reports can be customized based on audience needs and delivery requirements.
Pipeline Orchestration
The orchestration layer manages pipeline execution, handling scheduling, dependency management, and recovery from failures. It ensures reliable execution even in distributed environments.
Orchestration and Scheduling
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3from enum import Enum
4from datetime import datetime, timedelta
5import asyncio
6import logging
7
8
9class ScheduleType(Enum):
10 """Types of evaluation schedules."""
11 CRON = "cron"
12 INTERVAL = "interval"
13 EVENT_TRIGGERED = "event_triggered"
14 MANUAL = "manual"
15
16
17@dataclass
18class PipelineSchedule:
19 """Schedule configuration for pipeline runs."""
20 schedule_type: ScheduleType
21 cron_expression: Optional[str] = None
22 interval_seconds: Optional[int] = None
23 event_types: list[str] = field(default_factory=list)
24 enabled: bool = True
25
26 def next_run_time(
27 self,
28 from_time: Optional[datetime] = None
29 ) -> Optional[datetime]:
30 """Calculate next scheduled run time."""
31 from_time = from_time or datetime.utcnow()
32
33 if self.schedule_type == ScheduleType.INTERVAL:
34 return from_time + timedelta(seconds=self.interval_seconds)
35
36 if self.schedule_type == ScheduleType.CRON:
37 # Use croniter for cron parsing
38 from croniter import croniter
39 cron = croniter(self.cron_expression, from_time)
40 return cron.get_next(datetime)
41
42 return None
43
44
45@dataclass
46class PipelineRun:
47 """Represents a single pipeline execution."""
48 run_id: str
49 schedule_id: Optional[str]
50 status: str
51 started_at: datetime
52 completed_at: Optional[datetime] = None
53 context: Optional[PipelineContext] = None
54 error: Optional[str] = None
55
56
57class PipelineOrchestrator:
58 """Orchestrates pipeline execution."""
59
60 def __init__(
61 self,
62 pipeline: EvaluationPipeline,
63 max_concurrent_runs: int = 1,
64 history_size: int = 100
65 ):
66 self.pipeline = pipeline
67 self.max_concurrent_runs = max_concurrent_runs
68 self.history_size = history_size
69
70 self._schedules: dict[str, PipelineSchedule] = {}
71 self._run_history: list[PipelineRun] = []
72 self._active_runs: dict[str, PipelineRun] = {}
73 self._running = False
74 self._scheduler_task: Optional[asyncio.Task] = None
75 self._semaphore = asyncio.Semaphore(max_concurrent_runs)
76
77 self.logger = logging.getLogger(__name__)
78
79 def add_schedule(
80 self,
81 schedule_id: str,
82 schedule: PipelineSchedule
83 ) -> None:
84 """Add a schedule for pipeline runs."""
85 self._schedules[schedule_id] = schedule
86
87 def remove_schedule(self, schedule_id: str) -> None:
88 """Remove a schedule."""
89 self._schedules.pop(schedule_id, None)
90
91 async def trigger_run(
92 self,
93 input_data: Any,
94 schedule_id: Optional[str] = None,
95 metadata: Optional[dict[str, Any]] = None
96 ) -> PipelineRun:
97 """Trigger a pipeline run."""
98 run_id = str(uuid.uuid4())
99
100 run = PipelineRun(
101 run_id=run_id,
102 schedule_id=schedule_id,
103 status="pending",
104 started_at=datetime.utcnow()
105 )
106
107 self._active_runs[run_id] = run
108
109 try:
110 async with self._semaphore:
111 run.status = "running"
112
113 context = PipelineContext(
114 run_id=run_id,
115 metadata=metadata or {}
116 )
117
118 context = await self.pipeline.run(input_data, context)
119
120 run.context = context
121 run.status = "completed" if not context.errors else "failed"
122 run.completed_at = datetime.utcnow()
123
124 except Exception as e:
125 run.status = "error"
126 run.error = str(e)
127 run.completed_at = datetime.utcnow()
128 self.logger.error(f"Pipeline run {run_id} failed: {e}")
129
130 finally:
131 self._active_runs.pop(run_id, None)
132 self._add_to_history(run)
133
134 return run
135
136 def _add_to_history(self, run: PipelineRun) -> None:
137 """Add run to history, maintaining size limit."""
138 self._run_history.append(run)
139
140 if len(self._run_history) > self.history_size:
141 self._run_history = self._run_history[-self.history_size:]
142
143 async def start_scheduler(self) -> None:
144 """Start the schedule manager."""
145 self._running = True
146 self._scheduler_task = asyncio.create_task(self._scheduler_loop())
147
148 async def stop_scheduler(self) -> None:
149 """Stop the schedule manager."""
150 self._running = False
151
152 if self._scheduler_task:
153 self._scheduler_task.cancel()
154 try:
155 await self._scheduler_task
156 except asyncio.CancelledError:
157 pass
158
159 async def _scheduler_loop(self) -> None:
160 """Main scheduler loop."""
161 next_runs: dict[str, datetime] = {}
162
163 # Initialize next run times
164 for schedule_id, schedule in self._schedules.items():
165 if schedule.enabled:
166 next_time = schedule.next_run_time()
167 if next_time:
168 next_runs[schedule_id] = next_time
169
170 while self._running:
171 now = datetime.utcnow()
172
173 # Check for due schedules
174 for schedule_id, next_time in list(next_runs.items()):
175 if next_time <= now:
176 schedule = self._schedules.get(schedule_id)
177
178 if schedule and schedule.enabled:
179 # Trigger run in background
180 asyncio.create_task(
181 self.trigger_run(
182 input_data=None, # Will use data source
183 schedule_id=schedule_id,
184 metadata={"triggered_by": "scheduler"}
185 )
186 )
187
188 # Calculate next run
189 next_time = schedule.next_run_time(now)
190 if next_time:
191 next_runs[schedule_id] = next_time
192 else:
193 next_runs.pop(schedule_id)
194
195 # Sleep until next check
196 await asyncio.sleep(1)
197
198 def get_active_runs(self) -> list[PipelineRun]:
199 """Get currently active runs."""
200 return list(self._active_runs.values())
201
202 def get_run_history(
203 self,
204 limit: int = 10,
205 status: Optional[str] = None
206 ) -> list[PipelineRun]:
207 """Get run history."""
208 runs = self._run_history
209
210 if status:
211 runs = [r for r in runs if r.status == status]
212
213 return runs[-limit:]
214
215
216class EventDispatcher:
217 """Dispatches events to trigger pipeline runs."""
218
219 def __init__(self, orchestrator: PipelineOrchestrator):
220 self.orchestrator = orchestrator
221 self._handlers: dict[str, list[Callable]] = {}
222
223 def register_handler(
224 self,
225 event_type: str,
226 handler: Callable
227 ) -> None:
228 """Register an event handler."""
229 if event_type not in self._handlers:
230 self._handlers[event_type] = []
231 self._handlers[event_type].append(handler)
232
233 async def dispatch(
234 self,
235 event_type: str,
236 event_data: dict[str, Any]
237 ) -> None:
238 """Dispatch an event."""
239 # Check for triggered schedules
240 for schedule_id, schedule in self.orchestrator._schedules.items():
241 if (
242 schedule.schedule_type == ScheduleType.EVENT_TRIGGERED and
243 event_type in schedule.event_types and
244 schedule.enabled
245 ):
246 asyncio.create_task(
247 self.orchestrator.trigger_run(
248 input_data=event_data.get("data"),
249 schedule_id=schedule_id,
250 metadata={
251 "triggered_by": "event",
252 "event_type": event_type,
253 "event_data": event_data
254 }
255 )
256 )
257
258 # Run custom handlers
259 handlers = self._handlers.get(event_type, [])
260 for handler in handlers:
261 try:
262 if asyncio.iscoroutinefunction(handler):
263 await handler(event_data)
264 else:
265 handler(event_data)
266 except Exception as e:
267 logging.error(f"Event handler error: {e}")The orchestrator manages scheduled runs, concurrent execution limits, and event-triggered evaluations. It maintains run history for debugging and provides visibility into active pipeline executions.
Deployment and Operations
Deploying the evaluation pipeline requires careful consideration of infrastructure, monitoring, and operational procedures. This section covers production deployment patterns.
Infrastructure and API Layer
1from fastapi import FastAPI, HTTPException, BackgroundTasks
2from pydantic import BaseModel
3from typing import Any, Optional
4import uvicorn
5
6
7# API Models
8class TriggerRunRequest(BaseModel):
9 schedule_id: Optional[str] = None
10 input_data: Optional[dict[str, Any]] = None
11 metadata: Optional[dict[str, Any]] = None
12
13
14class RunStatusResponse(BaseModel):
15 run_id: str
16 status: str
17 started_at: str
18 completed_at: Optional[str]
19 error: Optional[str]
20
21
22class ScheduleRequest(BaseModel):
23 schedule_type: str
24 cron_expression: Optional[str] = None
25 interval_seconds: Optional[int] = None
26 event_types: list[str] = []
27 enabled: bool = True
28
29
30# API Application
31def create_api(orchestrator: PipelineOrchestrator) -> FastAPI:
32 """Create FastAPI application for pipeline management."""
33
34 app = FastAPI(
35 title="Evaluation Pipeline API",
36 description="API for managing evaluation pipeline runs",
37 version="1.0.0"
38 )
39
40 @app.post("/runs/trigger", response_model=RunStatusResponse)
41 async def trigger_run(
42 request: TriggerRunRequest,
43 background_tasks: BackgroundTasks
44 ):
45 """Trigger a new pipeline run."""
46 run = await orchestrator.trigger_run(
47 input_data=request.input_data,
48 schedule_id=request.schedule_id,
49 metadata=request.metadata
50 )
51
52 return RunStatusResponse(
53 run_id=run.run_id,
54 status=run.status,
55 started_at=run.started_at.isoformat(),
56 completed_at=run.completed_at.isoformat() if run.completed_at else None,
57 error=run.error
58 )
59
60 @app.get("/runs/{run_id}", response_model=RunStatusResponse)
61 async def get_run_status(run_id: str):
62 """Get status of a specific run."""
63 # Check active runs
64 if run_id in orchestrator._active_runs:
65 run = orchestrator._active_runs[run_id]
66 else:
67 # Check history
68 runs = [r for r in orchestrator._run_history if r.run_id == run_id]
69 if not runs:
70 raise HTTPException(status_code=404, detail="Run not found")
71 run = runs[0]
72
73 return RunStatusResponse(
74 run_id=run.run_id,
75 status=run.status,
76 started_at=run.started_at.isoformat(),
77 completed_at=run.completed_at.isoformat() if run.completed_at else None,
78 error=run.error
79 )
80
81 @app.get("/runs")
82 async def list_runs(
83 limit: int = 10,
84 status: Optional[str] = None
85 ):
86 """List recent pipeline runs."""
87 runs = orchestrator.get_run_history(limit=limit, status=status)
88
89 return {
90 "runs": [
91 RunStatusResponse(
92 run_id=r.run_id,
93 status=r.status,
94 started_at=r.started_at.isoformat(),
95 completed_at=r.completed_at.isoformat() if r.completed_at else None,
96 error=r.error
97 )
98 for r in runs
99 ],
100 "active_count": len(orchestrator._active_runs)
101 }
102
103 @app.get("/runs/active")
104 async def get_active_runs():
105 """Get currently active runs."""
106 runs = orchestrator.get_active_runs()
107
108 return {
109 "runs": [
110 RunStatusResponse(
111 run_id=r.run_id,
112 status=r.status,
113 started_at=r.started_at.isoformat(),
114 completed_at=None,
115 error=None
116 )
117 for r in runs
118 ]
119 }
120
121 @app.post("/schedules/{schedule_id}")
122 async def create_schedule(schedule_id: str, request: ScheduleRequest):
123 """Create or update a schedule."""
124 schedule = PipelineSchedule(
125 schedule_type=ScheduleType(request.schedule_type),
126 cron_expression=request.cron_expression,
127 interval_seconds=request.interval_seconds,
128 event_types=request.event_types,
129 enabled=request.enabled
130 )
131
132 orchestrator.add_schedule(schedule_id, schedule)
133
134 return {"status": "created", "schedule_id": schedule_id}
135
136 @app.delete("/schedules/{schedule_id}")
137 async def delete_schedule(schedule_id: str):
138 """Delete a schedule."""
139 orchestrator.remove_schedule(schedule_id)
140 return {"status": "deleted", "schedule_id": schedule_id}
141
142 @app.get("/schedules")
143 async def list_schedules():
144 """List all schedules."""
145 return {
146 "schedules": [
147 {
148 "id": sid,
149 "type": s.schedule_type.value,
150 "enabled": s.enabled,
151 "next_run": s.next_run_time().isoformat() if s.next_run_time() else None
152 }
153 for sid, s in orchestrator._schedules.items()
154 ]
155 }
156
157 @app.get("/health")
158 async def health_check():
159 """Health check endpoint."""
160 return {
161 "status": "healthy",
162 "active_runs": len(orchestrator._active_runs),
163 "schedules_count": len(orchestrator._schedules)
164 }
165
166 return app
167
168
169# Docker Deployment Configuration
170DOCKERFILE = """
171FROM python:3.11-slim
172
173WORKDIR /app
174
175COPY requirements.txt .
176RUN pip install --no-cache-dir -r requirements.txt
177
178COPY . .
179
180EXPOSE 8000
181
182CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
183"""
184
185DOCKER_COMPOSE = """
186version: '3.8'
187
188services:
189 evaluation-pipeline:
190 build: .
191 ports:
192 - "8000:8000"
193 environment:
194 - DATABASE_URL=postgresql://user:pass@db:5432/evaluations
195 - REDIS_URL=redis://redis:6379
196 - S3_BUCKET=evaluation-results
197 depends_on:
198 - db
199 - redis
200 deploy:
201 replicas: 2
202 resources:
203 limits:
204 cpus: '2'
205 memory: 4G
206
207 db:
208 image: postgres:15
209 environment:
210 - POSTGRES_USER=user
211 - POSTGRES_PASSWORD=pass
212 - POSTGRES_DB=evaluations
213 volumes:
214 - postgres_data:/var/lib/postgresql/data
215
216 redis:
217 image: redis:7-alpine
218 volumes:
219 - redis_data:/data
220
221volumes:
222 postgres_data:
223 redis_data:
224"""
225
226KUBERNETES_DEPLOYMENT = """
227apiVersion: apps/v1
228kind: Deployment
229metadata:
230 name: evaluation-pipeline
231 labels:
232 app: evaluation-pipeline
233spec:
234 replicas: 3
235 selector:
236 matchLabels:
237 app: evaluation-pipeline
238 template:
239 metadata:
240 labels:
241 app: evaluation-pipeline
242 spec:
243 containers:
244 - name: pipeline
245 image: evaluation-pipeline:latest
246 ports:
247 - containerPort: 8000
248 resources:
249 requests:
250 memory: "2Gi"
251 cpu: "1"
252 limits:
253 memory: "4Gi"
254 cpu: "2"
255 env:
256 - name: DATABASE_URL
257 valueFrom:
258 secretKeyRef:
259 name: pipeline-secrets
260 key: database-url
261 livenessProbe:
262 httpGet:
263 path: /health
264 port: 8000
265 initialDelaySeconds: 30
266 periodSeconds: 10
267 readinessProbe:
268 httpGet:
269 path: /health
270 port: 8000
271 initialDelaySeconds: 5
272 periodSeconds: 5
273---
274apiVersion: v1
275kind: Service
276metadata:
277 name: evaluation-pipeline
278spec:
279 selector:
280 app: evaluation-pipeline
281 ports:
282 - port: 80
283 targetPort: 8000
284 type: LoadBalancer
285"""The API layer provides RESTful endpoints for managing pipeline runs, schedules, and monitoring. Containerized deployment with Docker or Kubernetes enables horizontal scaling and high availability.
Complete Implementation
Bringing all components together, here is a complete implementation that demonstrates how to configure and run the evaluation pipeline.
Full Pipeline Assembly
1import asyncio
2from typing import Any
3
4
5# Example Evaluators
6class AccuracyEvaluator(Evaluator):
7 """Evaluates answer accuracy."""
8
9 async def evaluate(
10 self,
11 task: EvaluationTask,
12 agent_output: dict[str, Any]
13 ) -> dict[str, float]:
14 if not task.expected_output:
15 return {}
16
17 expected = task.expected_output.get("answer", "")
18 actual = agent_output.get("answer", "")
19
20 # Simple exact match for demonstration
21 accuracy = 1.0 if expected == actual else 0.0
22
23 # Partial match score
24 expected_words = set(expected.lower().split())
25 actual_words = set(actual.lower().split())
26
27 if expected_words:
28 overlap = len(expected_words & actual_words)
29 partial_score = overlap / len(expected_words)
30 else:
31 partial_score = 0.0
32
33 return {
34 "accuracy": accuracy,
35 "partial_match": partial_score
36 }
37
38
39class LatencyEvaluator(Evaluator):
40 """Evaluates response latency."""
41
42 def __init__(self, target_latency_ms: float = 1000.0):
43 self.target_latency_ms = target_latency_ms
44
45 async def evaluate(
46 self,
47 task: EvaluationTask,
48 agent_output: dict[str, Any]
49 ) -> dict[str, float]:
50 latency_ms = agent_output.get("latency_ms", 0)
51
52 # Score based on target latency
53 if latency_ms <= self.target_latency_ms:
54 latency_score = 1.0
55 else:
56 latency_score = self.target_latency_ms / latency_ms
57
58 return {
59 "latency_score": latency_score,
60 "latency_ms": latency_ms
61 }
62
63
64class SafetyEvaluator(Evaluator):
65 """Evaluates output safety."""
66
67 def __init__(self, blocked_terms: list[str] = None):
68 self.blocked_terms = blocked_terms or []
69
70 async def evaluate(
71 self,
72 task: EvaluationTask,
73 agent_output: dict[str, Any]
74 ) -> dict[str, float]:
75 output_text = str(agent_output.get("answer", "")).lower()
76
77 # Check for blocked terms
78 violations = sum(
79 1 for term in self.blocked_terms
80 if term.lower() in output_text
81 )
82
83 safety_score = 1.0 if violations == 0 else max(0, 1.0 - violations * 0.2)
84
85 return {
86 "safety_score": safety_score,
87 "safety_violations": float(violations)
88 }
89
90
91# Example Agent Registry
92async def example_qa_agent(input_data: dict[str, Any]) -> dict[str, Any]:
93 """Example QA agent for testing."""
94 import time
95
96 start = time.perf_counter()
97
98 # Simulate processing
99 await asyncio.sleep(0.1)
100
101 question = input_data.get("question", "")
102
103 # Simple rule-based response for demo
104 if "capital" in question.lower():
105 answer = "Paris is the capital of France."
106 elif "largest" in question.lower():
107 answer = "The largest ocean is the Pacific Ocean."
108 else:
109 answer = "I dont have enough information to answer that question."
110
111 latency = (time.perf_counter() - start) * 1000
112
113 return {
114 "answer": answer,
115 "latency_ms": latency,
116 "confidence": 0.85
117 }
118
119
120async def example_summarization_agent(input_data: dict[str, Any]) -> dict[str, Any]:
121 """Example summarization agent."""
122 import time
123
124 start = time.perf_counter()
125 await asyncio.sleep(0.2)
126
127 text = input_data.get("text", "")
128
129 # Simple extraction for demo
130 sentences = text.split(".")
131 summary = ". ".join(sentences[:2]) + "." if sentences else ""
132
133 latency = (time.perf_counter() - start) * 1000
134
135 return {
136 "answer": summary,
137 "latency_ms": latency,
138 "word_count": len(summary.split())
139 }
140
141
142async def build_and_run_pipeline():
143 """Build and run the complete evaluation pipeline."""
144
145 # Configuration
146 config = PipelineConfig(
147 batch_size=50,
148 max_concurrent_evaluations=10,
149 timeout_seconds=300,
150 retry_count=3,
151 enable_parallel_evaluation=True
152 )
153
154 # Create agent executor
155 agent_registry = {
156 "qa_agent": example_qa_agent,
157 "summarization_agent": example_summarization_agent
158 }
159
160 agent_executor = AgentExecutor(
161 agent_registry=agent_registry,
162 default_timeout=60.0
163 )
164
165 # Create composite evaluator
166 evaluator = CompositeEvaluator([
167 AccuracyEvaluator(),
168 LatencyEvaluator(target_latency_ms=500.0),
169 SafetyEvaluator(blocked_terms=["harmful", "dangerous"])
170 ])
171
172 # Build pipeline
173 pipeline = EvaluationPipeline(config)
174
175 # Register components
176 # Ingestion - using database source (mocked for demo)
177 ingestion = DataIngestionComponent(
178 sources=[], # Would add actual sources
179 batch_size=config.batch_size
180 )
181 pipeline.register_component(PipelineStage.INGESTION, ingestion)
182
183 # Evaluation engine
184 evaluation_engine = EvaluationEngineComponent(
185 agent_executor=agent_executor,
186 evaluator=evaluator,
187 num_workers=config.max_concurrent_evaluations
188 )
189 pipeline.register_component(PipelineStage.EVALUATION, evaluation_engine)
190
191 # Results processing
192 results_processor = ResultsProcessingComponent(
193 detect_anomalies=True
194 )
195 pipeline.register_component(PipelineStage.AGGREGATION, results_processor)
196
197 # Reporting
198 reporting = ReportingComponent(
199 formats=[
200 JSONReportFormat(),
201 HTMLReportFormat(),
202 MarkdownReportFormat()
203 ],
204 output_dir="./evaluation_reports"
205 )
206 pipeline.register_component(PipelineStage.REPORTING, reporting)
207
208 # Add hooks for monitoring
209 pipeline.add_hook("pre_stage", lambda stage, ctx:
210 print(f"Starting stage: {stage.value}")
211 )
212
213 pipeline.add_hook("post_stage", lambda stage, ctx:
214 print(f"Completed stage: {stage.value}")
215 )
216
217 pipeline.add_hook("on_error", lambda stage, err, ctx:
218 print(f"Error in {stage.value}: {err}")
219 )
220
221 # Create orchestrator
222 orchestrator = PipelineOrchestrator(
223 pipeline=pipeline,
224 max_concurrent_runs=2
225 )
226
227 # Add schedules
228 orchestrator.add_schedule(
229 "hourly_evaluation",
230 PipelineSchedule(
231 schedule_type=ScheduleType.INTERVAL,
232 interval_seconds=3600,
233 enabled=True
234 )
235 )
236
237 orchestrator.add_schedule(
238 "nightly_comprehensive",
239 PipelineSchedule(
240 schedule_type=ScheduleType.CRON,
241 cron_expression="0 2 * * *", # 2 AM daily
242 enabled=True
243 )
244 )
245
246 # Create API
247 app = create_api(orchestrator)
248
249 # Example: Trigger a manual run with sample tasks
250 sample_tasks = [
251 EvaluationTask(
252 task_id="task_001",
253 agent_id="qa_agent",
254 input_data={"question": "What is the capital of France?"},
255 expected_output={"answer": "Paris is the capital of France."},
256 priority=1
257 ),
258 EvaluationTask(
259 task_id="task_002",
260 agent_id="qa_agent",
261 input_data={"question": "What is the largest ocean?"},
262 expected_output={"answer": "The largest ocean is the Pacific Ocean."},
263 priority=1
264 ),
265 EvaluationTask(
266 task_id="task_003",
267 agent_id="summarization_agent",
268 input_data={
269 "text": "AI systems are transforming industries. "
270 "Machine learning enables new capabilities. "
271 "The future holds great promise."
272 },
273 expected_output=None,
274 priority=0
275 )
276 ]
277
278 # Run pipeline directly with sample tasks
279 context = PipelineContext()
280
281 # Skip ingestion for demo - pass tasks directly to evaluation
282 results = await evaluation_engine.execute(sample_tasks, context)
283
284 # Process results
285 aggregated = await results_processor.execute(results, context)
286
287 # Generate reports
288 reports = await reporting.execute(aggregated, context)
289
290 print("\n" + "=" * 60)
291 print("EVALUATION COMPLETE")
292 print("=" * 60)
293 print(f"Run ID: {context.run_id}")
294 print(f"Total tasks: {len(sample_tasks)}")
295 print(f"Successful: {context.metadata.get('evaluation_stats', dict()).get('successful', 0)}")
296 print(f"Reports generated: {len(reports.get('reports', []))}")
297
298 for report in reports.get("reports", []):
299 print(f" - {report['format']}: {report['path']}")
300
301 return orchestrator, app
302
303
304# Main entry point
305if __name__ == "__main__":
306 # Run the pipeline
307 orchestrator, app = asyncio.run(build_and_run_pipeline())
308
309 # Start the API server
310 # uvicorn.run(app, host="0.0.0.0", port=8000)This complete implementation demonstrates assembling all pipeline components into a working system. The modular design allows easy customization and extension for specific evaluation requirements.
Production Checklist: Before deploying to production, ensure you have configured proper database connections, set up monitoring and alerting, implemented authentication for the API, and tested failover scenarios for each pipeline component.
Summary
In this capstone section, we built a comprehensive evaluation pipeline that integrates all the concepts from Chapter 20. The pipeline provides a production-ready foundation for assessing agent performance at scale.
Key Components Covered
- Pipeline Architecture - Modular design with distinct stages for ingestion, evaluation, aggregation, reporting, and archival
- Data Ingestion - Multi-source data loading with database, S3, and API integrations plus automatic deduplication
- Evaluation Engine - Parallel execution with worker pools, retry logic, timeout handling, and composite evaluators
- Results Processing - Statistical aggregation, anomaly detection, and baseline comparison for actionable insights
- Reporting - Multi-format report generation with JSON, HTML, and Markdown outputs plus notification integration
- Orchestration - Schedule management, event-triggered runs, and concurrent execution control
- Deployment - REST API, containerization patterns, and Kubernetes deployment configurations
Best Practices Summary
| Practice | Benefit |
|---|---|
| Modular component design | Easy testing, maintenance, and scaling |
| Async processing throughout | High throughput and resource efficiency |
| Comprehensive error handling | Graceful degradation and debugging |
| Statistical aggregation | Meaningful insights from raw data |
| Multi-format reporting | Flexibility for different stakeholders |
| Event-driven architecture | Responsive to production changes |
| Containerized deployment | Consistent environments and scaling |
The evaluation pipeline serves as the backbone of a data-driven agent development process. By consistently measuring performance across multiple dimensions, teams can make informed decisions about architecture, optimization, and deployment strategies.
Chapter Complete: You've now completed Chapter 20 on Evaluation and Benchmarking. You understand how to define meaningful metrics, design comprehensive benchmarks, build testing frameworks, run experiments, implement continuous evaluation, and deploy production evaluation pipelines. These capabilities are essential for building agents that improve over time and maintain quality in production.