Chapter 20
25 min read
Section 128 of 175

Building an Evaluation Pipeline

Evaluation and Benchmarking

Introduction

Building a comprehensive evaluation pipeline brings together all the concepts we've covered in this chapter: metrics collection, benchmark design, testing frameworks, A/B testing, and continuous evaluation. This capstone section guides you through creating a production-ready evaluation system that can assess agent performance at scale, providing actionable insights for continuous improvement.

Learning Objectives: By the end of this section, you will be able to design and implement an end-to-end evaluation pipeline, integrate multiple evaluation strategies into a unified system, build scalable data processing for evaluation results, and deploy a production-ready evaluation infrastructure.

A well-designed evaluation pipeline serves as the foundation for data-driven agent development. It provides consistent, reproducible assessments that inform architectural decisions, guide optimization efforts, and ensure quality standards are maintained as systems evolve.


Pipeline Architecture

The evaluation pipeline follows a modular architecture where each component handles a specific responsibility. This design enables independent scaling, easy testing, and flexible deployment across different environments.

Core Architecture Components

🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, Optional, Generic, TypeVar
4from enum import Enum
5import asyncio
6from datetime import datetime
7import uuid
8
9T = TypeVar('T')
10
11class PipelineStage(Enum):
12    """Pipeline processing stages."""
13    INGESTION = "ingestion"
14    PREPROCESSING = "preprocessing"
15    EVALUATION = "evaluation"
16    AGGREGATION = "aggregation"
17    REPORTING = "reporting"
18    ARCHIVAL = "archival"
19
20
21@dataclass
22class PipelineContext:
23    """Context passed through pipeline stages."""
24    run_id: str = field(default_factory=lambda: str(uuid.uuid4()))
25    started_at: datetime = field(default_factory=datetime.utcnow)
26    metadata: dict[str, Any] = field(default_factory=dict)
27    stage_results: dict[str, Any] = field(default_factory=dict)
28    errors: list[dict[str, Any]] = field(default_factory=list)
29
30    def record_stage(
31        self,
32        stage: PipelineStage,
33        result: Any,
34        duration_ms: float
35    ) -> None:
36        """Record results from a pipeline stage."""
37        self.stage_results[stage.value] = {
38            "result": result,
39            "duration_ms": duration_ms,
40            "completed_at": datetime.utcnow().isoformat()
41        }
42
43    def record_error(
44        self,
45        stage: PipelineStage,
46        error: Exception,
47        recoverable: bool = True
48    ) -> None:
49        """Record an error during pipeline execution."""
50        self.errors.append({
51            "stage": stage.value,
52            "error_type": type(error).__name__,
53            "message": str(error),
54            "recoverable": recoverable,
55            "timestamp": datetime.utcnow().isoformat()
56        })
57
58
59class PipelineComponent(ABC, Generic[T]):
60    """Abstract base for pipeline components."""
61
62    def __init__(self, name: str):
63        self.name = name
64        self.stage: PipelineStage = PipelineStage.EVALUATION
65
66    @abstractmethod
67    async def process(
68        self,
69        input_data: Any,
70        context: PipelineContext
71    ) -> T:
72        """Process input and return result."""
73        pass
74
75    async def execute(
76        self,
77        input_data: Any,
78        context: PipelineContext
79    ) -> T:
80        """Execute with timing and error handling."""
81        import time
82        start = time.perf_counter()
83
84        try:
85            result = await self.process(input_data, context)
86            duration_ms = (time.perf_counter() - start) * 1000
87            context.record_stage(self.stage, result, duration_ms)
88            return result
89
90        except Exception as e:
91            context.record_error(self.stage, e)
92            raise
93
94
95@dataclass
96class PipelineConfig:
97    """Configuration for the evaluation pipeline."""
98    # Data ingestion settings
99    batch_size: int = 100
100    max_concurrent_evaluations: int = 10
101
102    # Evaluation settings
103    timeout_seconds: float = 300.0
104    retry_count: int = 3
105    retry_delay_seconds: float = 5.0
106
107    # Storage settings
108    results_bucket: str = "evaluation-results"
109    archive_after_days: int = 90
110
111    # Reporting settings
112    report_formats: list[str] = field(
113        default_factory=lambda: ["json", "html"]
114    )
115    notification_channels: list[str] = field(
116        default_factory=lambda: ["slack", "email"]
117    )
118
119    # Feature flags
120    enable_caching: bool = True
121    enable_parallel_evaluation: bool = True
122    enable_detailed_logging: bool = False
123
124
125class EvaluationPipeline:
126    """Main evaluation pipeline orchestrator."""
127
128    def __init__(self, config: PipelineConfig):
129        self.config = config
130        self.components: dict[PipelineStage, PipelineComponent] = {}
131        self.hooks: dict[str, list[callable]] = {
132            "pre_stage": [],
133            "post_stage": [],
134            "on_error": [],
135            "on_complete": []
136        }
137
138    def register_component(
139        self,
140        stage: PipelineStage,
141        component: PipelineComponent
142    ) -> None:
143        """Register a component for a pipeline stage."""
144        component.stage = stage
145        self.components[stage] = component
146
147    def add_hook(
148        self,
149        hook_type: str,
150        callback: callable
151    ) -> None:
152        """Add a hook callback."""
153        if hook_type in self.hooks:
154            self.hooks[hook_type].append(callback)
155
156    async def _run_hooks(
157        self,
158        hook_type: str,
159        *args,
160        **kwargs
161    ) -> None:
162        """Run all hooks of a given type."""
163        for callback in self.hooks[hook_type]:
164            if asyncio.iscoroutinefunction(callback):
165                await callback(*args, **kwargs)
166            else:
167                callback(*args, **kwargs)
168
169    async def run(
170        self,
171        input_data: Any,
172        context: Optional[PipelineContext] = None
173    ) -> PipelineContext:
174        """Execute the full pipeline."""
175        context = context or PipelineContext()
176        current_data = input_data
177
178        # Define stage order
179        stage_order = [
180            PipelineStage.INGESTION,
181            PipelineStage.PREPROCESSING,
182            PipelineStage.EVALUATION,
183            PipelineStage.AGGREGATION,
184            PipelineStage.REPORTING,
185            PipelineStage.ARCHIVAL
186        ]
187
188        try:
189            for stage in stage_order:
190                if stage not in self.components:
191                    continue
192
193                component = self.components[stage]
194                await self._run_hooks("pre_stage", stage, context)
195
196                try:
197                    current_data = await component.execute(
198                        current_data,
199                        context
200                    )
201                    await self._run_hooks("post_stage", stage, context)
202
203                except Exception as e:
204                    await self._run_hooks("on_error", stage, e, context)
205
206                    # Check if error is recoverable
207                    if not any(
208                        err["stage"] == stage.value and
209                        not err["recoverable"]
210                        for err in context.errors
211                    ):
212                        continue
213                    raise
214
215            await self._run_hooks("on_complete", context)
216
217        except Exception as e:
218            context.metadata["failed"] = True
219            context.metadata["failure_reason"] = str(e)
220
221        return context

The pipeline architecture separates concerns into distinct stages, each handled by a specialized component. The context object flows through all stages, accumulating results and tracking errors for comprehensive observability.

StageResponsibilityInputOutput
IngestionLoad evaluation dataData sourcesRaw evaluation tasks
PreprocessingClean and validateRaw tasksValidated tasks
EvaluationRun assessmentsTasksRaw results
AggregationCombine resultsRaw resultsAggregated metrics
ReportingGenerate reportsMetricsReports/alerts
ArchivalStore for analysisAll dataArchived records

Data Ingestion Layer

The data ingestion layer handles loading evaluation tasks from various sources including databases, file systems, message queues, and external APIs. It normalizes data into a consistent format for downstream processing.

Multi-Source Data Ingestion

🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import Any, AsyncIterator, Optional
4from enum import Enum
5import json
6import asyncio
7import aiohttp
8import aioboto3
9
10
11@dataclass
12class EvaluationTask:
13    """Represents a single evaluation task."""
14    task_id: str
15    agent_id: str
16    input_data: dict[str, Any]
17    expected_output: Optional[dict[str, Any]] = None
18    metadata: dict[str, Any] = field(default_factory=dict)
19    priority: int = 0
20    timeout_seconds: float = 60.0
21
22    def to_dict(self) -> dict[str, Any]:
23        return {
24            "task_id": self.task_id,
25            "agent_id": self.agent_id,
26            "input_data": self.input_data,
27            "expected_output": self.expected_output,
28            "metadata": self.metadata,
29            "priority": self.priority,
30            "timeout_seconds": self.timeout_seconds
31        }
32
33
34class DataSource(ABC):
35    """Abstract data source for evaluation tasks."""
36
37    @abstractmethod
38    async def fetch_tasks(
39        self,
40        batch_size: int = 100
41    ) -> AsyncIterator[list[EvaluationTask]]:
42        """Fetch evaluation tasks in batches."""
43        pass
44
45    @abstractmethod
46    async def get_task_count(self) -> int:
47        """Get total number of available tasks."""
48        pass
49
50
51class DatabaseDataSource(DataSource):
52    """Fetch tasks from a database."""
53
54    def __init__(
55        self,
56        connection_string: str,
57        table_name: str = "evaluation_tasks"
58    ):
59        self.connection_string = connection_string
60        self.table_name = table_name
61        self._pool = None
62
63    async def _get_pool(self):
64        if self._pool is None:
65            import asyncpg
66            self._pool = await asyncpg.create_pool(self.connection_string)
67        return self._pool
68
69    async def fetch_tasks(
70        self,
71        batch_size: int = 100
72    ) -> AsyncIterator[list[EvaluationTask]]:
73        pool = await self._get_pool()
74        offset = 0
75
76        while True:
77            async with pool.acquire() as conn:
78                rows = await conn.fetch(
79                    f"""
80                    SELECT * FROM {self.table_name}
81                    WHERE status = 'pending'
82                    ORDER BY priority DESC, created_at ASC
83                    LIMIT $1 OFFSET $2
84                    """,
85                    batch_size,
86                    offset
87                )
88
89                if not rows:
90                    break
91
92                tasks = [
93                    EvaluationTask(
94                        task_id=row["id"],
95                        agent_id=row["agent_id"],
96                        input_data=json.loads(row["input_data"]),
97                        expected_output=json.loads(row["expected_output"])
98                        if row["expected_output"] else None,
99                        metadata=json.loads(row["metadata"] or "{}"),
100                        priority=row["priority"],
101                        timeout_seconds=row["timeout_seconds"]
102                    )
103                    for row in rows
104                ]
105
106                yield tasks
107                offset += batch_size
108
109    async def get_task_count(self) -> int:
110        pool = await self._get_pool()
111        async with pool.acquire() as conn:
112            result = await conn.fetchval(
113                f"SELECT COUNT(*) FROM {self.table_name} WHERE status = 'pending'"
114            )
115            return result
116
117
118class S3DataSource(DataSource):
119    """Fetch tasks from S3 bucket."""
120
121    def __init__(
122        self,
123        bucket_name: str,
124        prefix: str = "evaluation-tasks/"
125    ):
126        self.bucket_name = bucket_name
127        self.prefix = prefix
128        self._session = aioboto3.Session()
129
130    async def fetch_tasks(
131        self,
132        batch_size: int = 100
133    ) -> AsyncIterator[list[EvaluationTask]]:
134        async with self._session.client("s3") as s3:
135            paginator = s3.get_paginator("list_objects_v2")
136
137            batch = []
138            async for page in paginator.paginate(
139                Bucket=self.bucket_name,
140                Prefix=self.prefix
141            ):
142                for obj in page.get("Contents", []):
143                    # Fetch object content
144                    response = await s3.get_object(
145                        Bucket=self.bucket_name,
146                        Key=obj["Key"]
147                    )
148                    content = await response["Body"].read()
149                    data = json.loads(content)
150
151                    task = EvaluationTask(
152                        task_id=data["task_id"],
153                        agent_id=data["agent_id"],
154                        input_data=data["input_data"],
155                        expected_output=data.get("expected_output"),
156                        metadata=data.get("metadata", {}),
157                        priority=data.get("priority", 0),
158                        timeout_seconds=data.get("timeout_seconds", 60.0)
159                    )
160                    batch.append(task)
161
162                    if len(batch) >= batch_size:
163                        yield batch
164                        batch = []
165
166            if batch:
167                yield batch
168
169    async def get_task_count(self) -> int:
170        count = 0
171        async with self._session.client("s3") as s3:
172            paginator = s3.get_paginator("list_objects_v2")
173            async for page in paginator.paginate(
174                Bucket=self.bucket_name,
175                Prefix=self.prefix
176            ):
177                count += len(page.get("Contents", []))
178        return count
179
180
181class APIDataSource(DataSource):
182    """Fetch tasks from an external API."""
183
184    def __init__(
185        self,
186        base_url: str,
187        api_key: str,
188        endpoint: str = "/evaluation/tasks"
189    ):
190        self.base_url = base_url
191        self.api_key = api_key
192        self.endpoint = endpoint
193
194    async def fetch_tasks(
195        self,
196        batch_size: int = 100
197    ) -> AsyncIterator[list[EvaluationTask]]:
198        async with aiohttp.ClientSession() as session:
199            page = 1
200
201            while True:
202                async with session.get(
203                    f"{self.base_url}{self.endpoint}",
204                    params={"page": page, "limit": batch_size},
205                    headers={"Authorization": f"Bearer {self.api_key}"}
206                ) as response:
207                    response.raise_for_status()
208                    data = await response.json()
209
210                    if not data.get("tasks"):
211                        break
212
213                    tasks = [
214                        EvaluationTask(
215                            task_id=t["id"],
216                            agent_id=t["agent_id"],
217                            input_data=t["input"],
218                            expected_output=t.get("expected"),
219                            metadata=t.get("metadata", {}),
220                            priority=t.get("priority", 0),
221                            timeout_seconds=t.get("timeout", 60.0)
222                        )
223                        for t in data["tasks"]
224                    ]
225
226                    yield tasks
227
228                    if not data.get("has_more", False):
229                        break
230                    page += 1
231
232    async def get_task_count(self) -> int:
233        async with aiohttp.ClientSession() as session:
234            async with session.get(
235                f"{self.base_url}{self.endpoint}/count",
236                headers={"Authorization": f"Bearer {self.api_key}"}
237            ) as response:
238                response.raise_for_status()
239                data = await response.json()
240                return data["count"]
241
242
243class DataIngestionComponent(PipelineComponent[list[EvaluationTask]]):
244    """Pipeline component for data ingestion."""
245
246    def __init__(
247        self,
248        sources: list[DataSource],
249        batch_size: int = 100,
250        deduplicate: bool = True
251    ):
252        super().__init__("data_ingestion")
253        self.sources = sources
254        self.batch_size = batch_size
255        self.deduplicate = deduplicate
256        self.stage = PipelineStage.INGESTION
257
258    async def process(
259        self,
260        input_data: Any,
261        context: PipelineContext
262    ) -> list[EvaluationTask]:
263        """Ingest tasks from all configured sources."""
264        all_tasks = []
265        seen_ids = set()
266
267        for source in self.sources:
268            source_name = type(source).__name__
269            source_tasks = []
270
271            async for batch in source.fetch_tasks(self.batch_size):
272                for task in batch:
273                    if self.deduplicate and task.task_id in seen_ids:
274                        continue
275
276                    seen_ids.add(task.task_id)
277                    source_tasks.append(task)
278
279            all_tasks.extend(source_tasks)
280
281            context.metadata[f"ingested_from_{source_name}"] = len(source_tasks)
282
283        context.metadata["total_tasks_ingested"] = len(all_tasks)
284
285        # Sort by priority
286        all_tasks.sort(key=lambda t: t.priority, reverse=True)
287
288        return all_tasks

The data ingestion layer supports multiple sources simultaneously with automatic deduplication. Each source implements an async iterator pattern for efficient memory usage when processing large datasets.


Evaluation Engine

The evaluation engine executes assessments against agents, managing concurrency, retries, and timeout handling. It supports multiple evaluation strategies and can distribute work across multiple workers.

Parallel Evaluation Executor

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3from enum import Enum
4import asyncio
5import time
6from contextlib import asynccontextmanager
7
8
9class EvaluationStatus(Enum):
10    """Status of an evaluation."""
11    PENDING = "pending"
12    RUNNING = "running"
13    COMPLETED = "completed"
14    FAILED = "failed"
15    TIMEOUT = "timeout"
16    SKIPPED = "skipped"
17
18
19@dataclass
20class EvaluationResult:
21    """Result of a single evaluation."""
22    task_id: str
23    agent_id: str
24    status: EvaluationStatus
25    metrics: dict[str, float] = field(default_factory=dict)
26    output: Optional[dict[str, Any]] = None
27    error: Optional[str] = None
28    execution_time_ms: float = 0.0
29    metadata: dict[str, Any] = field(default_factory=dict)
30
31    @property
32    def is_success(self) -> bool:
33        return self.status == EvaluationStatus.COMPLETED
34
35
36class Evaluator(ABC):
37    """Abstract base for evaluators."""
38
39    @abstractmethod
40    async def evaluate(
41        self,
42        task: EvaluationTask,
43        agent_output: dict[str, Any]
44    ) -> dict[str, float]:
45        """Evaluate agent output and return metrics."""
46        pass
47
48
49class CompositeEvaluator(Evaluator):
50    """Combines multiple evaluators."""
51
52    def __init__(self, evaluators: list[Evaluator]):
53        self.evaluators = evaluators
54
55    async def evaluate(
56        self,
57        task: EvaluationTask,
58        agent_output: dict[str, Any]
59    ) -> dict[str, float]:
60        all_metrics = {}
61
62        evaluations = await asyncio.gather(
63            *[e.evaluate(task, agent_output) for e in self.evaluators],
64            return_exceptions=True
65        )
66
67        for eval_result in evaluations:
68            if isinstance(eval_result, Exception):
69                continue
70            all_metrics.update(eval_result)
71
72        return all_metrics
73
74
75class AgentExecutor:
76    """Executes agents for evaluation."""
77
78    def __init__(
79        self,
80        agent_registry: dict[str, Callable],
81        default_timeout: float = 60.0
82    ):
83        self.agent_registry = agent_registry
84        self.default_timeout = default_timeout
85
86    async def execute(
87        self,
88        task: EvaluationTask
89    ) -> dict[str, Any]:
90        """Execute agent and return output."""
91        agent_fn = self.agent_registry.get(task.agent_id)
92
93        if not agent_fn:
94            raise ValueError(f"Unknown agent: {task.agent_id}")
95
96        timeout = task.timeout_seconds or self.default_timeout
97
98        try:
99            result = await asyncio.wait_for(
100                agent_fn(task.input_data),
101                timeout=timeout
102            )
103            return result
104
105        except asyncio.TimeoutError:
106            raise TimeoutError(
107                f"Agent {task.agent_id} timed out after {timeout}s"
108            )
109
110
111class EvaluationWorker:
112    """Worker that processes evaluation tasks."""
113
114    def __init__(
115        self,
116        worker_id: int,
117        agent_executor: AgentExecutor,
118        evaluator: Evaluator,
119        retry_count: int = 3,
120        retry_delay: float = 1.0
121    ):
122        self.worker_id = worker_id
123        self.agent_executor = agent_executor
124        self.evaluator = evaluator
125        self.retry_count = retry_count
126        self.retry_delay = retry_delay
127        self._active = False
128
129    async def process_task(
130        self,
131        task: EvaluationTask
132    ) -> EvaluationResult:
133        """Process a single evaluation task."""
134        start_time = time.perf_counter()
135        last_error = None
136
137        for attempt in range(self.retry_count + 1):
138            try:
139                # Execute agent
140                output = await self.agent_executor.execute(task)
141
142                # Run evaluation
143                metrics = await self.evaluator.evaluate(task, output)
144
145                execution_time = (time.perf_counter() - start_time) * 1000
146
147                return EvaluationResult(
148                    task_id=task.task_id,
149                    agent_id=task.agent_id,
150                    status=EvaluationStatus.COMPLETED,
151                    metrics=metrics,
152                    output=output,
153                    execution_time_ms=execution_time,
154                    metadata={
155                        "worker_id": self.worker_id,
156                        "attempts": attempt + 1
157                    }
158                )
159
160            except TimeoutError as e:
161                execution_time = (time.perf_counter() - start_time) * 1000
162                return EvaluationResult(
163                    task_id=task.task_id,
164                    agent_id=task.agent_id,
165                    status=EvaluationStatus.TIMEOUT,
166                    error=str(e),
167                    execution_time_ms=execution_time,
168                    metadata={"worker_id": self.worker_id}
169                )
170
171            except Exception as e:
172                last_error = e
173                if attempt < self.retry_count:
174                    await asyncio.sleep(
175                        self.retry_delay * (2 ** attempt)
176                    )
177
178        execution_time = (time.perf_counter() - start_time) * 1000
179        return EvaluationResult(
180            task_id=task.task_id,
181            agent_id=task.agent_id,
182            status=EvaluationStatus.FAILED,
183            error=str(last_error),
184            execution_time_ms=execution_time,
185            metadata={
186                "worker_id": self.worker_id,
187                "attempts": self.retry_count + 1
188            }
189        )
190
191
192class EvaluationPool:
193    """Pool of evaluation workers."""
194
195    def __init__(
196        self,
197        num_workers: int,
198        agent_executor: AgentExecutor,
199        evaluator: Evaluator,
200        retry_count: int = 3
201    ):
202        self.workers = [
203            EvaluationWorker(
204                worker_id=i,
205                agent_executor=agent_executor,
206                evaluator=evaluator,
207                retry_count=retry_count
208            )
209            for i in range(num_workers)
210        ]
211        self._task_queue: asyncio.Queue = asyncio.Queue()
212        self._result_queue: asyncio.Queue = asyncio.Queue()
213        self._running = False
214
215    async def _worker_loop(self, worker: EvaluationWorker):
216        """Main loop for a worker."""
217        while self._running:
218            try:
219                task = await asyncio.wait_for(
220                    self._task_queue.get(),
221                    timeout=1.0
222                )
223                result = await worker.process_task(task)
224                await self._result_queue.put(result)
225                self._task_queue.task_done()
226
227            except asyncio.TimeoutError:
228                continue
229
230    @asynccontextmanager
231    async def running(self):
232        """Context manager to start/stop the pool."""
233        self._running = True
234        worker_tasks = [
235            asyncio.create_task(self._worker_loop(w))
236            for w in self.workers
237        ]
238
239        try:
240            yield self
241        finally:
242            self._running = False
243            await asyncio.gather(*worker_tasks, return_exceptions=True)
244
245    async def submit(self, task: EvaluationTask) -> None:
246        """Submit a task for evaluation."""
247        await self._task_queue.put(task)
248
249    async def submit_batch(
250        self,
251        tasks: list[EvaluationTask]
252    ) -> None:
253        """Submit multiple tasks."""
254        for task in tasks:
255            await self._task_queue.put(task)
256
257    async def get_result(self) -> EvaluationResult:
258        """Get next available result."""
259        return await self._result_queue.get()
260
261    async def collect_results(
262        self,
263        count: int,
264        timeout: Optional[float] = None
265    ) -> list[EvaluationResult]:
266        """Collect specified number of results."""
267        results = []
268        deadline = time.time() + timeout if timeout else None
269
270        while len(results) < count:
271            remaining = None
272            if deadline:
273                remaining = deadline - time.time()
274                if remaining <= 0:
275                    break
276
277            try:
278                result = await asyncio.wait_for(
279                    self._result_queue.get(),
280                    timeout=remaining
281                )
282                results.append(result)
283
284            except asyncio.TimeoutError:
285                break
286
287        return results
288
289
290class EvaluationEngineComponent(PipelineComponent[list[EvaluationResult]]):
291    """Pipeline component for running evaluations."""
292
293    def __init__(
294        self,
295        agent_executor: AgentExecutor,
296        evaluator: Evaluator,
297        num_workers: int = 10,
298        retry_count: int = 3
299    ):
300        super().__init__("evaluation_engine")
301        self.pool = EvaluationPool(
302            num_workers=num_workers,
303            agent_executor=agent_executor,
304            evaluator=evaluator,
305            retry_count=retry_count
306        )
307        self.stage = PipelineStage.EVALUATION
308
309    async def process(
310        self,
311        input_data: list[EvaluationTask],
312        context: PipelineContext
313    ) -> list[EvaluationResult]:
314        """Run evaluations on all tasks."""
315        async with self.pool.running():
316            # Submit all tasks
317            await self.pool.submit_batch(input_data)
318
319            # Collect results
320            results = await self.pool.collect_results(
321                count=len(input_data),
322                timeout=context.metadata.get("timeout_seconds", 3600)
323            )
324
325        # Record statistics
326        successful = sum(1 for r in results if r.is_success)
327        failed = sum(1 for r in results if r.status == EvaluationStatus.FAILED)
328        timeouts = sum(1 for r in results if r.status == EvaluationStatus.TIMEOUT)
329
330        context.metadata["evaluation_stats"] = {
331            "total": len(results),
332            "successful": successful,
333            "failed": failed,
334            "timeouts": timeouts,
335            "success_rate": successful / len(results) if results else 0
336        }
337
338        return results

The evaluation engine uses a worker pool pattern for parallel execution. Each worker handles task execution, evaluation, and retry logic independently, maximizing throughput while maintaining isolation between evaluations.

Performance Tip: The number of workers should be tuned based on available resources and the nature of your evaluations. CPU-bound evaluations benefit from worker counts matching CPU cores, while I/O-bound evaluations can handle many more concurrent workers.

Results Processing

After evaluations complete, results must be aggregated, analyzed, and transformed into actionable insights. The results processing layer handles statistical aggregation, trend analysis, and anomaly detection.

Results Aggregation and Analysis

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from collections import defaultdict
4import statistics
5import math
6
7
8@dataclass
9class MetricStatistics:
10    """Statistical summary of a metric."""
11    name: str
12    count: int
13    mean: float
14    std_dev: float
15    min_value: float
16    max_value: float
17    median: float
18    p95: float
19    p99: float
20
21    def to_dict(self) -> dict[str, Any]:
22        return {
23            "name": self.name,
24            "count": self.count,
25            "mean": round(self.mean, 4),
26            "std_dev": round(self.std_dev, 4),
27            "min": round(self.min_value, 4),
28            "max": round(self.max_value, 4),
29            "median": round(self.median, 4),
30            "p95": round(self.p95, 4),
31            "p99": round(self.p99, 4)
32        }
33
34
35@dataclass
36class AgentSummary:
37    """Summary of agent performance."""
38    agent_id: str
39    total_evaluations: int
40    successful: int
41    failed: int
42    timeouts: int
43    success_rate: float
44    metrics: dict[str, MetricStatistics]
45    avg_execution_time_ms: float
46
47
48@dataclass
49class AggregatedResults:
50    """Aggregated evaluation results."""
51    run_id: str
52    total_evaluations: int
53    by_agent: dict[str, AgentSummary]
54    overall_metrics: dict[str, MetricStatistics]
55    anomalies: list[dict[str, Any]]
56    comparisons: dict[str, Any]
57
58
59class MetricAggregator:
60    """Aggregates metric values."""
61
62    def __init__(self):
63        self._values: dict[str, list[float]] = defaultdict(list)
64
65    def add(self, metric_name: str, value: float) -> None:
66        """Add a metric value."""
67        self._values[metric_name].append(value)
68
69    def compute_statistics(
70        self,
71        metric_name: str
72    ) -> Optional[MetricStatistics]:
73        """Compute statistics for a metric."""
74        values = self._values.get(metric_name, [])
75
76        if not values:
77            return None
78
79        sorted_values = sorted(values)
80        n = len(sorted_values)
81
82        mean = statistics.mean(values)
83        std_dev = statistics.stdev(values) if n > 1 else 0.0
84
85        # Calculate percentiles
86        p95_idx = int(n * 0.95)
87        p99_idx = int(n * 0.99)
88
89        return MetricStatistics(
90            name=metric_name,
91            count=n,
92            mean=mean,
93            std_dev=std_dev,
94            min_value=sorted_values[0],
95            max_value=sorted_values[-1],
96            median=statistics.median(values),
97            p95=sorted_values[min(p95_idx, n - 1)],
98            p99=sorted_values[min(p99_idx, n - 1)]
99        )
100
101    def get_all_statistics(self) -> dict[str, MetricStatistics]:
102        """Get statistics for all metrics."""
103        return {
104            name: stats
105            for name in self._values
106            if (stats := self.compute_statistics(name)) is not None
107        }
108
109
110class AnomalyDetector:
111    """Detects anomalies in evaluation results."""
112
113    def __init__(
114        self,
115        z_score_threshold: float = 3.0,
116        min_samples: int = 10
117    ):
118        self.z_score_threshold = z_score_threshold
119        self.min_samples = min_samples
120
121    def detect(
122        self,
123        results: list[EvaluationResult],
124        baseline_stats: Optional[dict[str, MetricStatistics]] = None
125    ) -> list[dict[str, Any]]:
126        """Detect anomalies in results."""
127        anomalies = []
128
129        # Group by metric
130        metric_values: dict[str, list[tuple[str, float]]] = defaultdict(list)
131
132        for result in results:
133            if not result.is_success:
134                continue
135
136            for metric_name, value in result.metrics.items():
137                metric_values[metric_name].append(
138                    (result.task_id, value)
139                )
140
141        # Detect anomalies per metric
142        for metric_name, values in metric_values.items():
143            if len(values) < self.min_samples:
144                continue
145
146            all_values = [v for _, v in values]
147            mean = statistics.mean(all_values)
148            std = statistics.stdev(all_values)
149
150            if std == 0:
151                continue
152
153            # Use baseline if available
154            if baseline_stats and metric_name in baseline_stats:
155                baseline = baseline_stats[metric_name]
156                mean = baseline.mean
157                std = baseline.std_dev
158
159            # Find outliers using z-score
160            for task_id, value in values:
161                z_score = abs((value - mean) / std)
162
163                if z_score > self.z_score_threshold:
164                    anomalies.append({
165                        "type": "metric_outlier",
166                        "task_id": task_id,
167                        "metric": metric_name,
168                        "value": value,
169                        "z_score": z_score,
170                        "expected_mean": mean,
171                        "expected_std": std
172                    })
173
174        # Detect execution time anomalies
175        exec_times = [
176            (r.task_id, r.execution_time_ms)
177            for r in results
178            if r.execution_time_ms > 0
179        ]
180
181        if len(exec_times) >= self.min_samples:
182            times = [t for _, t in exec_times]
183            mean_time = statistics.mean(times)
184            std_time = statistics.stdev(times)
185
186            if std_time > 0:
187                for task_id, exec_time in exec_times:
188                    z_score = (exec_time - mean_time) / std_time
189
190                    if z_score > self.z_score_threshold:
191                        anomalies.append({
192                            "type": "slow_execution",
193                            "task_id": task_id,
194                            "execution_time_ms": exec_time,
195                            "z_score": z_score,
196                            "expected_mean_ms": mean_time
197                        })
198
199        return anomalies
200
201
202class ResultComparator:
203    """Compares results against baselines."""
204
205    def __init__(
206        self,
207        significance_threshold: float = 0.05,
208        min_improvement: float = 0.01
209    ):
210        self.significance_threshold = significance_threshold
211        self.min_improvement = min_improvement
212
213    def compare(
214        self,
215        current: dict[str, MetricStatistics],
216        baseline: dict[str, MetricStatistics]
217    ) -> dict[str, Any]:
218        """Compare current results against baseline."""
219        comparisons = {}
220
221        for metric_name, current_stats in current.items():
222            if metric_name not in baseline:
223                comparisons[metric_name] = {
224                    "status": "new_metric",
225                    "current_mean": current_stats.mean
226                }
227                continue
228
229            baseline_stats = baseline[metric_name]
230
231            # Calculate relative change
232            if baseline_stats.mean != 0:
233                relative_change = (
234                    (current_stats.mean - baseline_stats.mean) /
235                    abs(baseline_stats.mean)
236                )
237            else:
238                relative_change = 0 if current_stats.mean == 0 else float('inf')
239
240            # Perform t-test approximation
241            t_statistic = self._calculate_t_statistic(
242                current_stats,
243                baseline_stats
244            )
245
246            # Determine significance
247            is_significant = abs(t_statistic) > 1.96  # ~95% confidence
248
249            # Determine direction
250            if abs(relative_change) < self.min_improvement:
251                direction = "no_change"
252            elif relative_change > 0:
253                direction = "improvement"
254            else:
255                direction = "regression"
256
257            comparisons[metric_name] = {
258                "baseline_mean": baseline_stats.mean,
259                "current_mean": current_stats.mean,
260                "relative_change": relative_change,
261                "absolute_change": current_stats.mean - baseline_stats.mean,
262                "direction": direction,
263                "is_significant": is_significant,
264                "t_statistic": t_statistic
265            }
266
267        # Check for missing metrics
268        for metric_name in baseline:
269            if metric_name not in current:
270                comparisons[metric_name] = {
271                    "status": "missing_metric",
272                    "baseline_mean": baseline[metric_name].mean
273                }
274
275        return comparisons
276
277    def _calculate_t_statistic(
278        self,
279        current: MetricStatistics,
280        baseline: MetricStatistics
281    ) -> float:
282        """Calculate approximate t-statistic."""
283        # Pooled standard error approximation
284        se_current = current.std_dev / math.sqrt(current.count)
285        se_baseline = baseline.std_dev / math.sqrt(baseline.count)
286        pooled_se = math.sqrt(se_current**2 + se_baseline**2)
287
288        if pooled_se == 0:
289            return 0.0
290
291        return (current.mean - baseline.mean) / pooled_se
292
293
294class ResultsProcessingComponent(PipelineComponent[AggregatedResults]):
295    """Pipeline component for results processing."""
296
297    def __init__(
298        self,
299        baseline_stats: Optional[dict[str, MetricStatistics]] = None,
300        detect_anomalies: bool = True
301    ):
302        super().__init__("results_processing")
303        self.baseline_stats = baseline_stats
304        self.detect_anomalies = detect_anomalies
305        self.anomaly_detector = AnomalyDetector()
306        self.comparator = ResultComparator()
307        self.stage = PipelineStage.AGGREGATION
308
309    async def process(
310        self,
311        input_data: list[EvaluationResult],
312        context: PipelineContext
313    ) -> AggregatedResults:
314        """Process and aggregate evaluation results."""
315        # Group by agent
316        by_agent: dict[str, list[EvaluationResult]] = defaultdict(list)
317        for result in input_data:
318            by_agent[result.agent_id].append(result)
319
320        # Aggregate per agent
321        agent_summaries = {}
322        overall_aggregator = MetricAggregator()
323
324        for agent_id, results in by_agent.items():
325            agent_aggregator = MetricAggregator()
326            exec_times = []
327
328            successful = 0
329            failed = 0
330            timeouts = 0
331
332            for result in results:
333                if result.status == EvaluationStatus.COMPLETED:
334                    successful += 1
335                    for metric, value in result.metrics.items():
336                        agent_aggregator.add(metric, value)
337                        overall_aggregator.add(metric, value)
338                    exec_times.append(result.execution_time_ms)
339                elif result.status == EvaluationStatus.FAILED:
340                    failed += 1
341                elif result.status == EvaluationStatus.TIMEOUT:
342                    timeouts += 1
343
344            agent_summaries[agent_id] = AgentSummary(
345                agent_id=agent_id,
346                total_evaluations=len(results),
347                successful=successful,
348                failed=failed,
349                timeouts=timeouts,
350                success_rate=successful / len(results) if results else 0,
351                metrics=agent_aggregator.get_all_statistics(),
352                avg_execution_time_ms=(
353                    statistics.mean(exec_times) if exec_times else 0
354                )
355            )
356
357        # Detect anomalies
358        anomalies = []
359        if self.detect_anomalies:
360            anomalies = self.anomaly_detector.detect(
361                input_data,
362                self.baseline_stats
363            )
364
365        # Compare against baseline
366        comparisons = {}
367        overall_metrics = overall_aggregator.get_all_statistics()
368        if self.baseline_stats:
369            comparisons = self.comparator.compare(
370                overall_metrics,
371                self.baseline_stats
372            )
373
374        return AggregatedResults(
375            run_id=context.run_id,
376            total_evaluations=len(input_data),
377            by_agent=agent_summaries,
378            overall_metrics=overall_metrics,
379            anomalies=anomalies,
380            comparisons=comparisons
381        )

The results processing layer transforms raw evaluation results into structured insights. Statistical aggregation, anomaly detection, and baseline comparison provide actionable information for understanding agent performance.


Reporting and Visualization

The reporting layer generates human-readable reports in multiple formats and triggers notifications when issues are detected. Reports can be customized for different audiences and delivery channels.

Multi-Format Report Generation

🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import Any, Optional
4from datetime import datetime
5import json
6
7
8class ReportFormat(ABC):
9    """Abstract report format."""
10
11    @abstractmethod
12    def render(
13        self,
14        results: AggregatedResults,
15        context: PipelineContext
16    ) -> str:
17        """Render report in this format."""
18        pass
19
20    @property
21    @abstractmethod
22    def extension(self) -> str:
23        """File extension for this format."""
24        pass
25
26
27class JSONReportFormat(ReportFormat):
28    """JSON report format."""
29
30    def render(
31        self,
32        results: AggregatedResults,
33        context: PipelineContext
34    ) -> str:
35        report_data = {
36            "run_id": results.run_id,
37            "generated_at": datetime.utcnow().isoformat(),
38            "summary": {
39                "total_evaluations": results.total_evaluations,
40                "agents_evaluated": len(results.by_agent),
41                "anomalies_detected": len(results.anomalies)
42            },
43            "agents": {
44                agent_id: {
45                    "total": summary.total_evaluations,
46                    "successful": summary.successful,
47                    "failed": summary.failed,
48                    "timeouts": summary.timeouts,
49                    "success_rate": summary.success_rate,
50                    "avg_execution_time_ms": summary.avg_execution_time_ms,
51                    "metrics": {
52                        name: stats.to_dict()
53                        for name, stats in summary.metrics.items()
54                    }
55                }
56                for agent_id, summary in results.by_agent.items()
57            },
58            "overall_metrics": {
59                name: stats.to_dict()
60                for name, stats in results.overall_metrics.items()
61            },
62            "anomalies": results.anomalies,
63            "comparisons": results.comparisons,
64            "pipeline_metadata": {
65                "started_at": context.started_at.isoformat(),
66                "stages_completed": list(context.stage_results.keys()),
67                "errors": context.errors
68            }
69        }
70
71        return json.dumps(report_data, indent=2)
72
73    @property
74    def extension(self) -> str:
75        return "json"
76
77
78class HTMLReportFormat(ReportFormat):
79    """HTML report format."""
80
81    def render(
82        self,
83        results: AggregatedResults,
84        context: PipelineContext
85    ) -> str:
86        html_parts = [
87            "<!DOCTYPE html>",
88            "<html><head>",
89            "<title>Evaluation Report</title>",
90            "<style>",
91            self._get_styles(),
92            "</style>",
93            "</head><body>",
94            f"<h1>Evaluation Report - {results.run_id[:8]}</h1>",
95            f"<p class='timestamp'>Generated: {datetime.utcnow().isoformat()}</p>"
96        ]
97
98        # Summary section
99        html_parts.append("<section class='summary'>")
100        html_parts.append("<h2>Summary</h2>")
101        html_parts.append(f"<p>Total Evaluations: {results.total_evaluations}</p>")
102        html_parts.append(f"<p>Agents Evaluated: {len(results.by_agent)}</p>")
103        html_parts.append(f"<p>Anomalies Detected: {len(results.anomalies)}</p>")
104        html_parts.append("</section>")
105
106        # Agent results
107        html_parts.append("<section class='agents'>")
108        html_parts.append("<h2>Agent Performance</h2>")
109
110        for agent_id, summary in results.by_agent.items():
111            html_parts.append(f"<div class='agent-card'>")
112            html_parts.append(f"<h3>{agent_id}</h3>")
113            html_parts.append("<table>")
114            html_parts.append("<tr><th>Metric</th><th>Value</th></tr>")
115            html_parts.append(
116                f"<tr><td>Success Rate</td>"
117                f"<td>{summary.success_rate:.1%}</td></tr>"
118            )
119            html_parts.append(
120                f"<tr><td>Avg Execution Time</td>"
121                f"<td>{summary.avg_execution_time_ms:.1f}ms</td></tr>"
122            )
123
124            for metric_name, stats in summary.metrics.items():
125                html_parts.append(
126                    f"<tr><td>{metric_name}</td>"
127                    f"<td>{stats.mean:.4f}{stats.std_dev:.4f})</td></tr>"
128                )
129
130            html_parts.append("</table>")
131            html_parts.append("</div>")
132
133        html_parts.append("</section>")
134
135        # Anomalies section
136        if results.anomalies:
137            html_parts.append("<section class='anomalies'>")
138            html_parts.append("<h2>Anomalies Detected</h2>")
139            html_parts.append("<ul>")
140
141            for anomaly in results.anomalies:
142                html_parts.append(
143                    f"<li class='anomaly-{anomaly['type']}'>"
144                    f"{anomaly['type']}: Task {anomaly['task_id']} - "
145                    f"z-score: {anomaly['z_score']:.2f}</li>"
146                )
147
148            html_parts.append("</ul>")
149            html_parts.append("</section>")
150
151        # Comparisons section
152        if results.comparisons:
153            html_parts.append("<section class='comparisons'>")
154            html_parts.append("<h2>Baseline Comparison</h2>")
155            html_parts.append("<table>")
156            html_parts.append(
157                "<tr><th>Metric</th><th>Change</th><th>Status</th></tr>"
158            )
159
160            for metric, comparison in results.comparisons.items():
161                if "relative_change" in comparison:
162                    change = f"{comparison['relative_change']:+.1%}"
163                    status_class = (
164                        "improvement" if comparison["direction"] == "improvement"
165                        else "regression" if comparison["direction"] == "regression"
166                        else "neutral"
167                    )
168                    status = comparison["direction"]
169                else:
170                    change = "N/A"
171                    status = comparison.get("status", "unknown")
172                    status_class = "neutral"
173
174                html_parts.append(
175                    f"<tr class='{status_class}'>"
176                    f"<td>{metric}</td><td>{change}</td><td>{status}</td></tr>"
177                )
178
179            html_parts.append("</table>")
180            html_parts.append("</section>")
181
182        html_parts.append("</body></html>")
183
184        return "
185".join(html_parts)
186
187    def _get_styles(self) -> str:
188        return """
189            body { font-family: system-ui, sans-serif; margin: 40px; }
190            h1 { color: #333; }
191            .timestamp { color: #666; font-size: 0.9em; }
192            section { margin: 20px 0; padding: 20px; border: 1px solid #ddd; }
193            .agent-card { background: #f9f9f9; padding: 15px; margin: 10px 0; }
194            table { width: 100%; border-collapse: collapse; }
195            th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
196            th { background: #f0f0f0; }
197            .improvement { background: #d4edda; }
198            .regression { background: #f8d7da; }
199            .neutral { background: #fff; }
200            .anomaly-metric_outlier { color: #dc3545; }
201            .anomaly-slow_execution { color: #fd7e14; }
202        """
203
204    @property
205    def extension(self) -> str:
206        return "html"
207
208
209class MarkdownReportFormat(ReportFormat):
210    """Markdown report format."""
211
212    def render(
213        self,
214        results: AggregatedResults,
215        context: PipelineContext
216    ) -> str:
217        lines = [
218            f"# Evaluation Report",
219            f"",
220            f"**Run ID:** {results.run_id}",
221            f"**Generated:** {datetime.utcnow().isoformat()}",
222            f"",
223            f"## Summary",
224            f"",
225            f"- Total Evaluations: {results.total_evaluations}",
226            f"- Agents Evaluated: {len(results.by_agent)}",
227            f"- Anomalies Detected: {len(results.anomalies)}",
228            f"",
229            f"## Agent Performance",
230            f""
231        ]
232
233        for agent_id, summary in results.by_agent.items():
234            lines.append(f"### {agent_id}")
235            lines.append(f"")
236            lines.append(f"| Metric | Value |")
237            lines.append(f"|--------|-------|")
238            lines.append(f"| Success Rate | {summary.success_rate:.1%} |")
239            lines.append(
240                f"| Avg Execution Time | {summary.avg_execution_time_ms:.1f}ms |"
241            )
242
243            for metric_name, stats in summary.metrics.items():
244                lines.append(
245                    f"| {metric_name} | {stats.mean:.4f}{stats.std_dev:.4f}) |"
246                )
247
248            lines.append(f"")
249
250        if results.anomalies:
251            lines.append(f"## Anomalies")
252            lines.append(f"")
253
254            for anomaly in results.anomalies:
255                lines.append(
256                    f"- **{anomaly['type']}**: Task {anomaly['task_id']} "
257                    f"(z-score: {anomaly['z_score']:.2f})"
258                )
259
260            lines.append(f"")
261
262        return "
263".join(lines)
264
265    @property
266    def extension(self) -> str:
267        return "md"
268
269
270@dataclass
271class NotificationConfig:
272    """Configuration for notifications."""
273    channel: str
274    webhook_url: Optional[str] = None
275    recipients: list[str] = field(default_factory=list)
276    notify_on_anomalies: bool = True
277    notify_on_regressions: bool = True
278    min_regression_threshold: float = 0.1
279
280
281class NotificationSender:
282    """Sends notifications based on results."""
283
284    def __init__(self, configs: list[NotificationConfig]):
285        self.configs = configs
286
287    async def send(
288        self,
289        results: AggregatedResults,
290        context: PipelineContext
291    ) -> list[dict[str, Any]]:
292        """Send notifications based on results."""
293        notifications_sent = []
294
295        for config in self.configs:
296            should_notify = False
297            reasons = []
298
299            # Check for anomalies
300            if config.notify_on_anomalies and results.anomalies:
301                should_notify = True
302                reasons.append(f"{len(results.anomalies)} anomalies detected")
303
304            # Check for regressions
305            if config.notify_on_regressions:
306                regressions = [
307                    (metric, comp)
308                    for metric, comp in results.comparisons.items()
309                    if comp.get("direction") == "regression" and
310                    abs(comp.get("relative_change", 0)) >= config.min_regression_threshold
311                ]
312
313                if regressions:
314                    should_notify = True
315                    reasons.append(f"{len(regressions)} significant regressions")
316
317            if should_notify:
318                notification = await self._send_notification(
319                    config,
320                    results,
321                    reasons
322                )
323                notifications_sent.append(notification)
324
325        return notifications_sent
326
327    async def _send_notification(
328        self,
329        config: NotificationConfig,
330        results: AggregatedResults,
331        reasons: list[str]
332    ) -> dict[str, Any]:
333        """Send a single notification."""
334        message = self._format_message(results, reasons)
335
336        if config.channel == "slack" and config.webhook_url:
337            async with aiohttp.ClientSession() as session:
338                await session.post(
339                    config.webhook_url,
340                    json={"text": message}
341                )
342
343        return {
344            "channel": config.channel,
345            "reasons": reasons,
346            "sent_at": datetime.utcnow().isoformat()
347        }
348
349    def _format_message(
350        self,
351        results: AggregatedResults,
352        reasons: list[str]
353    ) -> str:
354        """Format notification message."""
355        lines = [
356            f"⚠️ Evaluation Alert - Run {results.run_id[:8]}",
357            "",
358            "Issues detected:",
359        ]
360
361        for reason in reasons:
362            lines.append(f"• {reason}")
363
364        lines.append("")
365        lines.append(
366            f"Total evaluations: {results.total_evaluations}"
367        )
368
369        return "
370".join(lines)
371
372
373class ReportingComponent(PipelineComponent[dict[str, Any]]):
374    """Pipeline component for report generation."""
375
376    def __init__(
377        self,
378        formats: list[ReportFormat],
379        output_dir: str = "./reports",
380        notification_sender: Optional[NotificationSender] = None
381    ):
382        super().__init__("reporting")
383        self.formats = formats
384        self.output_dir = output_dir
385        self.notification_sender = notification_sender
386        self.stage = PipelineStage.REPORTING
387
388    async def process(
389        self,
390        input_data: AggregatedResults,
391        context: PipelineContext
392    ) -> dict[str, Any]:
393        """Generate reports and send notifications."""
394        import os
395        import aiofiles
396
397        os.makedirs(self.output_dir, exist_ok=True)
398
399        reports_generated = []
400
401        for format_handler in self.formats:
402            content = format_handler.render(input_data, context)
403            filename = f"report_{input_data.run_id}.{format_handler.extension}"
404            filepath = os.path.join(self.output_dir, filename)
405
406            async with aiofiles.open(filepath, "w") as f:
407                await f.write(content)
408
409            reports_generated.append({
410                "format": format_handler.extension,
411                "path": filepath,
412                "size_bytes": len(content)
413            })
414
415        # Send notifications
416        notifications = []
417        if self.notification_sender:
418            notifications = await self.notification_sender.send(
419                input_data,
420                context
421            )
422
423        return {
424            "reports": reports_generated,
425            "notifications": notifications
426        }

The reporting system supports multiple output formats and integrates with notification channels to alert stakeholders when issues are detected. Reports can be customized based on audience needs and delivery requirements.


Pipeline Orchestration

The orchestration layer manages pipeline execution, handling scheduling, dependency management, and recovery from failures. It ensures reliable execution even in distributed environments.

Orchestration and Scheduling

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3from enum import Enum
4from datetime import datetime, timedelta
5import asyncio
6import logging
7
8
9class ScheduleType(Enum):
10    """Types of evaluation schedules."""
11    CRON = "cron"
12    INTERVAL = "interval"
13    EVENT_TRIGGERED = "event_triggered"
14    MANUAL = "manual"
15
16
17@dataclass
18class PipelineSchedule:
19    """Schedule configuration for pipeline runs."""
20    schedule_type: ScheduleType
21    cron_expression: Optional[str] = None
22    interval_seconds: Optional[int] = None
23    event_types: list[str] = field(default_factory=list)
24    enabled: bool = True
25
26    def next_run_time(
27        self,
28        from_time: Optional[datetime] = None
29    ) -> Optional[datetime]:
30        """Calculate next scheduled run time."""
31        from_time = from_time or datetime.utcnow()
32
33        if self.schedule_type == ScheduleType.INTERVAL:
34            return from_time + timedelta(seconds=self.interval_seconds)
35
36        if self.schedule_type == ScheduleType.CRON:
37            # Use croniter for cron parsing
38            from croniter import croniter
39            cron = croniter(self.cron_expression, from_time)
40            return cron.get_next(datetime)
41
42        return None
43
44
45@dataclass
46class PipelineRun:
47    """Represents a single pipeline execution."""
48    run_id: str
49    schedule_id: Optional[str]
50    status: str
51    started_at: datetime
52    completed_at: Optional[datetime] = None
53    context: Optional[PipelineContext] = None
54    error: Optional[str] = None
55
56
57class PipelineOrchestrator:
58    """Orchestrates pipeline execution."""
59
60    def __init__(
61        self,
62        pipeline: EvaluationPipeline,
63        max_concurrent_runs: int = 1,
64        history_size: int = 100
65    ):
66        self.pipeline = pipeline
67        self.max_concurrent_runs = max_concurrent_runs
68        self.history_size = history_size
69
70        self._schedules: dict[str, PipelineSchedule] = {}
71        self._run_history: list[PipelineRun] = []
72        self._active_runs: dict[str, PipelineRun] = {}
73        self._running = False
74        self._scheduler_task: Optional[asyncio.Task] = None
75        self._semaphore = asyncio.Semaphore(max_concurrent_runs)
76
77        self.logger = logging.getLogger(__name__)
78
79    def add_schedule(
80        self,
81        schedule_id: str,
82        schedule: PipelineSchedule
83    ) -> None:
84        """Add a schedule for pipeline runs."""
85        self._schedules[schedule_id] = schedule
86
87    def remove_schedule(self, schedule_id: str) -> None:
88        """Remove a schedule."""
89        self._schedules.pop(schedule_id, None)
90
91    async def trigger_run(
92        self,
93        input_data: Any,
94        schedule_id: Optional[str] = None,
95        metadata: Optional[dict[str, Any]] = None
96    ) -> PipelineRun:
97        """Trigger a pipeline run."""
98        run_id = str(uuid.uuid4())
99
100        run = PipelineRun(
101            run_id=run_id,
102            schedule_id=schedule_id,
103            status="pending",
104            started_at=datetime.utcnow()
105        )
106
107        self._active_runs[run_id] = run
108
109        try:
110            async with self._semaphore:
111                run.status = "running"
112
113                context = PipelineContext(
114                    run_id=run_id,
115                    metadata=metadata or {}
116                )
117
118                context = await self.pipeline.run(input_data, context)
119
120                run.context = context
121                run.status = "completed" if not context.errors else "failed"
122                run.completed_at = datetime.utcnow()
123
124        except Exception as e:
125            run.status = "error"
126            run.error = str(e)
127            run.completed_at = datetime.utcnow()
128            self.logger.error(f"Pipeline run {run_id} failed: {e}")
129
130        finally:
131            self._active_runs.pop(run_id, None)
132            self._add_to_history(run)
133
134        return run
135
136    def _add_to_history(self, run: PipelineRun) -> None:
137        """Add run to history, maintaining size limit."""
138        self._run_history.append(run)
139
140        if len(self._run_history) > self.history_size:
141            self._run_history = self._run_history[-self.history_size:]
142
143    async def start_scheduler(self) -> None:
144        """Start the schedule manager."""
145        self._running = True
146        self._scheduler_task = asyncio.create_task(self._scheduler_loop())
147
148    async def stop_scheduler(self) -> None:
149        """Stop the schedule manager."""
150        self._running = False
151
152        if self._scheduler_task:
153            self._scheduler_task.cancel()
154            try:
155                await self._scheduler_task
156            except asyncio.CancelledError:
157                pass
158
159    async def _scheduler_loop(self) -> None:
160        """Main scheduler loop."""
161        next_runs: dict[str, datetime] = {}
162
163        # Initialize next run times
164        for schedule_id, schedule in self._schedules.items():
165            if schedule.enabled:
166                next_time = schedule.next_run_time()
167                if next_time:
168                    next_runs[schedule_id] = next_time
169
170        while self._running:
171            now = datetime.utcnow()
172
173            # Check for due schedules
174            for schedule_id, next_time in list(next_runs.items()):
175                if next_time <= now:
176                    schedule = self._schedules.get(schedule_id)
177
178                    if schedule and schedule.enabled:
179                        # Trigger run in background
180                        asyncio.create_task(
181                            self.trigger_run(
182                                input_data=None,  # Will use data source
183                                schedule_id=schedule_id,
184                                metadata={"triggered_by": "scheduler"}
185                            )
186                        )
187
188                        # Calculate next run
189                        next_time = schedule.next_run_time(now)
190                        if next_time:
191                            next_runs[schedule_id] = next_time
192                        else:
193                            next_runs.pop(schedule_id)
194
195            # Sleep until next check
196            await asyncio.sleep(1)
197
198    def get_active_runs(self) -> list[PipelineRun]:
199        """Get currently active runs."""
200        return list(self._active_runs.values())
201
202    def get_run_history(
203        self,
204        limit: int = 10,
205        status: Optional[str] = None
206    ) -> list[PipelineRun]:
207        """Get run history."""
208        runs = self._run_history
209
210        if status:
211            runs = [r for r in runs if r.status == status]
212
213        return runs[-limit:]
214
215
216class EventDispatcher:
217    """Dispatches events to trigger pipeline runs."""
218
219    def __init__(self, orchestrator: PipelineOrchestrator):
220        self.orchestrator = orchestrator
221        self._handlers: dict[str, list[Callable]] = {}
222
223    def register_handler(
224        self,
225        event_type: str,
226        handler: Callable
227    ) -> None:
228        """Register an event handler."""
229        if event_type not in self._handlers:
230            self._handlers[event_type] = []
231        self._handlers[event_type].append(handler)
232
233    async def dispatch(
234        self,
235        event_type: str,
236        event_data: dict[str, Any]
237    ) -> None:
238        """Dispatch an event."""
239        # Check for triggered schedules
240        for schedule_id, schedule in self.orchestrator._schedules.items():
241            if (
242                schedule.schedule_type == ScheduleType.EVENT_TRIGGERED and
243                event_type in schedule.event_types and
244                schedule.enabled
245            ):
246                asyncio.create_task(
247                    self.orchestrator.trigger_run(
248                        input_data=event_data.get("data"),
249                        schedule_id=schedule_id,
250                        metadata={
251                            "triggered_by": "event",
252                            "event_type": event_type,
253                            "event_data": event_data
254                        }
255                    )
256                )
257
258        # Run custom handlers
259        handlers = self._handlers.get(event_type, [])
260        for handler in handlers:
261            try:
262                if asyncio.iscoroutinefunction(handler):
263                    await handler(event_data)
264                else:
265                    handler(event_data)
266            except Exception as e:
267                logging.error(f"Event handler error: {e}")

The orchestrator manages scheduled runs, concurrent execution limits, and event-triggered evaluations. It maintains run history for debugging and provides visibility into active pipeline executions.


Deployment and Operations

Deploying the evaluation pipeline requires careful consideration of infrastructure, monitoring, and operational procedures. This section covers production deployment patterns.

Infrastructure and API Layer

🐍python
1from fastapi import FastAPI, HTTPException, BackgroundTasks
2from pydantic import BaseModel
3from typing import Any, Optional
4import uvicorn
5
6
7# API Models
8class TriggerRunRequest(BaseModel):
9    schedule_id: Optional[str] = None
10    input_data: Optional[dict[str, Any]] = None
11    metadata: Optional[dict[str, Any]] = None
12
13
14class RunStatusResponse(BaseModel):
15    run_id: str
16    status: str
17    started_at: str
18    completed_at: Optional[str]
19    error: Optional[str]
20
21
22class ScheduleRequest(BaseModel):
23    schedule_type: str
24    cron_expression: Optional[str] = None
25    interval_seconds: Optional[int] = None
26    event_types: list[str] = []
27    enabled: bool = True
28
29
30# API Application
31def create_api(orchestrator: PipelineOrchestrator) -> FastAPI:
32    """Create FastAPI application for pipeline management."""
33
34    app = FastAPI(
35        title="Evaluation Pipeline API",
36        description="API for managing evaluation pipeline runs",
37        version="1.0.0"
38    )
39
40    @app.post("/runs/trigger", response_model=RunStatusResponse)
41    async def trigger_run(
42        request: TriggerRunRequest,
43        background_tasks: BackgroundTasks
44    ):
45        """Trigger a new pipeline run."""
46        run = await orchestrator.trigger_run(
47            input_data=request.input_data,
48            schedule_id=request.schedule_id,
49            metadata=request.metadata
50        )
51
52        return RunStatusResponse(
53            run_id=run.run_id,
54            status=run.status,
55            started_at=run.started_at.isoformat(),
56            completed_at=run.completed_at.isoformat() if run.completed_at else None,
57            error=run.error
58        )
59
60    @app.get("/runs/{run_id}", response_model=RunStatusResponse)
61    async def get_run_status(run_id: str):
62        """Get status of a specific run."""
63        # Check active runs
64        if run_id in orchestrator._active_runs:
65            run = orchestrator._active_runs[run_id]
66        else:
67            # Check history
68            runs = [r for r in orchestrator._run_history if r.run_id == run_id]
69            if not runs:
70                raise HTTPException(status_code=404, detail="Run not found")
71            run = runs[0]
72
73        return RunStatusResponse(
74            run_id=run.run_id,
75            status=run.status,
76            started_at=run.started_at.isoformat(),
77            completed_at=run.completed_at.isoformat() if run.completed_at else None,
78            error=run.error
79        )
80
81    @app.get("/runs")
82    async def list_runs(
83        limit: int = 10,
84        status: Optional[str] = None
85    ):
86        """List recent pipeline runs."""
87        runs = orchestrator.get_run_history(limit=limit, status=status)
88
89        return {
90            "runs": [
91                RunStatusResponse(
92                    run_id=r.run_id,
93                    status=r.status,
94                    started_at=r.started_at.isoformat(),
95                    completed_at=r.completed_at.isoformat() if r.completed_at else None,
96                    error=r.error
97                )
98                for r in runs
99            ],
100            "active_count": len(orchestrator._active_runs)
101        }
102
103    @app.get("/runs/active")
104    async def get_active_runs():
105        """Get currently active runs."""
106        runs = orchestrator.get_active_runs()
107
108        return {
109            "runs": [
110                RunStatusResponse(
111                    run_id=r.run_id,
112                    status=r.status,
113                    started_at=r.started_at.isoformat(),
114                    completed_at=None,
115                    error=None
116                )
117                for r in runs
118            ]
119        }
120
121    @app.post("/schedules/{schedule_id}")
122    async def create_schedule(schedule_id: str, request: ScheduleRequest):
123        """Create or update a schedule."""
124        schedule = PipelineSchedule(
125            schedule_type=ScheduleType(request.schedule_type),
126            cron_expression=request.cron_expression,
127            interval_seconds=request.interval_seconds,
128            event_types=request.event_types,
129            enabled=request.enabled
130        )
131
132        orchestrator.add_schedule(schedule_id, schedule)
133
134        return {"status": "created", "schedule_id": schedule_id}
135
136    @app.delete("/schedules/{schedule_id}")
137    async def delete_schedule(schedule_id: str):
138        """Delete a schedule."""
139        orchestrator.remove_schedule(schedule_id)
140        return {"status": "deleted", "schedule_id": schedule_id}
141
142    @app.get("/schedules")
143    async def list_schedules():
144        """List all schedules."""
145        return {
146            "schedules": [
147                {
148                    "id": sid,
149                    "type": s.schedule_type.value,
150                    "enabled": s.enabled,
151                    "next_run": s.next_run_time().isoformat() if s.next_run_time() else None
152                }
153                for sid, s in orchestrator._schedules.items()
154            ]
155        }
156
157    @app.get("/health")
158    async def health_check():
159        """Health check endpoint."""
160        return {
161            "status": "healthy",
162            "active_runs": len(orchestrator._active_runs),
163            "schedules_count": len(orchestrator._schedules)
164        }
165
166    return app
167
168
169# Docker Deployment Configuration
170DOCKERFILE = """
171FROM python:3.11-slim
172
173WORKDIR /app
174
175COPY requirements.txt .
176RUN pip install --no-cache-dir -r requirements.txt
177
178COPY . .
179
180EXPOSE 8000
181
182CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
183"""
184
185DOCKER_COMPOSE = """
186version: '3.8'
187
188services:
189  evaluation-pipeline:
190    build: .
191    ports:
192      - "8000:8000"
193    environment:
194      - DATABASE_URL=postgresql://user:pass@db:5432/evaluations
195      - REDIS_URL=redis://redis:6379
196      - S3_BUCKET=evaluation-results
197    depends_on:
198      - db
199      - redis
200    deploy:
201      replicas: 2
202      resources:
203        limits:
204          cpus: '2'
205          memory: 4G
206
207  db:
208    image: postgres:15
209    environment:
210      - POSTGRES_USER=user
211      - POSTGRES_PASSWORD=pass
212      - POSTGRES_DB=evaluations
213    volumes:
214      - postgres_data:/var/lib/postgresql/data
215
216  redis:
217    image: redis:7-alpine
218    volumes:
219      - redis_data:/data
220
221volumes:
222  postgres_data:
223  redis_data:
224"""
225
226KUBERNETES_DEPLOYMENT = """
227apiVersion: apps/v1
228kind: Deployment
229metadata:
230  name: evaluation-pipeline
231  labels:
232    app: evaluation-pipeline
233spec:
234  replicas: 3
235  selector:
236    matchLabels:
237      app: evaluation-pipeline
238  template:
239    metadata:
240      labels:
241        app: evaluation-pipeline
242    spec:
243      containers:
244      - name: pipeline
245        image: evaluation-pipeline:latest
246        ports:
247        - containerPort: 8000
248        resources:
249          requests:
250            memory: "2Gi"
251            cpu: "1"
252          limits:
253            memory: "4Gi"
254            cpu: "2"
255        env:
256        - name: DATABASE_URL
257          valueFrom:
258            secretKeyRef:
259              name: pipeline-secrets
260              key: database-url
261        livenessProbe:
262          httpGet:
263            path: /health
264            port: 8000
265          initialDelaySeconds: 30
266          periodSeconds: 10
267        readinessProbe:
268          httpGet:
269            path: /health
270            port: 8000
271          initialDelaySeconds: 5
272          periodSeconds: 5
273---
274apiVersion: v1
275kind: Service
276metadata:
277  name: evaluation-pipeline
278spec:
279  selector:
280    app: evaluation-pipeline
281  ports:
282  - port: 80
283    targetPort: 8000
284  type: LoadBalancer
285"""

The API layer provides RESTful endpoints for managing pipeline runs, schedules, and monitoring. Containerized deployment with Docker or Kubernetes enables horizontal scaling and high availability.


Complete Implementation

Bringing all components together, here is a complete implementation that demonstrates how to configure and run the evaluation pipeline.

Full Pipeline Assembly

🐍python
1import asyncio
2from typing import Any
3
4
5# Example Evaluators
6class AccuracyEvaluator(Evaluator):
7    """Evaluates answer accuracy."""
8
9    async def evaluate(
10        self,
11        task: EvaluationTask,
12        agent_output: dict[str, Any]
13    ) -> dict[str, float]:
14        if not task.expected_output:
15            return {}
16
17        expected = task.expected_output.get("answer", "")
18        actual = agent_output.get("answer", "")
19
20        # Simple exact match for demonstration
21        accuracy = 1.0 if expected == actual else 0.0
22
23        # Partial match score
24        expected_words = set(expected.lower().split())
25        actual_words = set(actual.lower().split())
26
27        if expected_words:
28            overlap = len(expected_words & actual_words)
29            partial_score = overlap / len(expected_words)
30        else:
31            partial_score = 0.0
32
33        return {
34            "accuracy": accuracy,
35            "partial_match": partial_score
36        }
37
38
39class LatencyEvaluator(Evaluator):
40    """Evaluates response latency."""
41
42    def __init__(self, target_latency_ms: float = 1000.0):
43        self.target_latency_ms = target_latency_ms
44
45    async def evaluate(
46        self,
47        task: EvaluationTask,
48        agent_output: dict[str, Any]
49    ) -> dict[str, float]:
50        latency_ms = agent_output.get("latency_ms", 0)
51
52        # Score based on target latency
53        if latency_ms <= self.target_latency_ms:
54            latency_score = 1.0
55        else:
56            latency_score = self.target_latency_ms / latency_ms
57
58        return {
59            "latency_score": latency_score,
60            "latency_ms": latency_ms
61        }
62
63
64class SafetyEvaluator(Evaluator):
65    """Evaluates output safety."""
66
67    def __init__(self, blocked_terms: list[str] = None):
68        self.blocked_terms = blocked_terms or []
69
70    async def evaluate(
71        self,
72        task: EvaluationTask,
73        agent_output: dict[str, Any]
74    ) -> dict[str, float]:
75        output_text = str(agent_output.get("answer", "")).lower()
76
77        # Check for blocked terms
78        violations = sum(
79            1 for term in self.blocked_terms
80            if term.lower() in output_text
81        )
82
83        safety_score = 1.0 if violations == 0 else max(0, 1.0 - violations * 0.2)
84
85        return {
86            "safety_score": safety_score,
87            "safety_violations": float(violations)
88        }
89
90
91# Example Agent Registry
92async def example_qa_agent(input_data: dict[str, Any]) -> dict[str, Any]:
93    """Example QA agent for testing."""
94    import time
95
96    start = time.perf_counter()
97
98    # Simulate processing
99    await asyncio.sleep(0.1)
100
101    question = input_data.get("question", "")
102
103    # Simple rule-based response for demo
104    if "capital" in question.lower():
105        answer = "Paris is the capital of France."
106    elif "largest" in question.lower():
107        answer = "The largest ocean is the Pacific Ocean."
108    else:
109        answer = "I dont have enough information to answer that question."
110
111    latency = (time.perf_counter() - start) * 1000
112
113    return {
114        "answer": answer,
115        "latency_ms": latency,
116        "confidence": 0.85
117    }
118
119
120async def example_summarization_agent(input_data: dict[str, Any]) -> dict[str, Any]:
121    """Example summarization agent."""
122    import time
123
124    start = time.perf_counter()
125    await asyncio.sleep(0.2)
126
127    text = input_data.get("text", "")
128
129    # Simple extraction for demo
130    sentences = text.split(".")
131    summary = ". ".join(sentences[:2]) + "." if sentences else ""
132
133    latency = (time.perf_counter() - start) * 1000
134
135    return {
136        "answer": summary,
137        "latency_ms": latency,
138        "word_count": len(summary.split())
139    }
140
141
142async def build_and_run_pipeline():
143    """Build and run the complete evaluation pipeline."""
144
145    # Configuration
146    config = PipelineConfig(
147        batch_size=50,
148        max_concurrent_evaluations=10,
149        timeout_seconds=300,
150        retry_count=3,
151        enable_parallel_evaluation=True
152    )
153
154    # Create agent executor
155    agent_registry = {
156        "qa_agent": example_qa_agent,
157        "summarization_agent": example_summarization_agent
158    }
159
160    agent_executor = AgentExecutor(
161        agent_registry=agent_registry,
162        default_timeout=60.0
163    )
164
165    # Create composite evaluator
166    evaluator = CompositeEvaluator([
167        AccuracyEvaluator(),
168        LatencyEvaluator(target_latency_ms=500.0),
169        SafetyEvaluator(blocked_terms=["harmful", "dangerous"])
170    ])
171
172    # Build pipeline
173    pipeline = EvaluationPipeline(config)
174
175    # Register components
176    # Ingestion - using database source (mocked for demo)
177    ingestion = DataIngestionComponent(
178        sources=[],  # Would add actual sources
179        batch_size=config.batch_size
180    )
181    pipeline.register_component(PipelineStage.INGESTION, ingestion)
182
183    # Evaluation engine
184    evaluation_engine = EvaluationEngineComponent(
185        agent_executor=agent_executor,
186        evaluator=evaluator,
187        num_workers=config.max_concurrent_evaluations
188    )
189    pipeline.register_component(PipelineStage.EVALUATION, evaluation_engine)
190
191    # Results processing
192    results_processor = ResultsProcessingComponent(
193        detect_anomalies=True
194    )
195    pipeline.register_component(PipelineStage.AGGREGATION, results_processor)
196
197    # Reporting
198    reporting = ReportingComponent(
199        formats=[
200            JSONReportFormat(),
201            HTMLReportFormat(),
202            MarkdownReportFormat()
203        ],
204        output_dir="./evaluation_reports"
205    )
206    pipeline.register_component(PipelineStage.REPORTING, reporting)
207
208    # Add hooks for monitoring
209    pipeline.add_hook("pre_stage", lambda stage, ctx:
210        print(f"Starting stage: {stage.value}")
211    )
212
213    pipeline.add_hook("post_stage", lambda stage, ctx:
214        print(f"Completed stage: {stage.value}")
215    )
216
217    pipeline.add_hook("on_error", lambda stage, err, ctx:
218        print(f"Error in {stage.value}: {err}")
219    )
220
221    # Create orchestrator
222    orchestrator = PipelineOrchestrator(
223        pipeline=pipeline,
224        max_concurrent_runs=2
225    )
226
227    # Add schedules
228    orchestrator.add_schedule(
229        "hourly_evaluation",
230        PipelineSchedule(
231            schedule_type=ScheduleType.INTERVAL,
232            interval_seconds=3600,
233            enabled=True
234        )
235    )
236
237    orchestrator.add_schedule(
238        "nightly_comprehensive",
239        PipelineSchedule(
240            schedule_type=ScheduleType.CRON,
241            cron_expression="0 2 * * *",  # 2 AM daily
242            enabled=True
243        )
244    )
245
246    # Create API
247    app = create_api(orchestrator)
248
249    # Example: Trigger a manual run with sample tasks
250    sample_tasks = [
251        EvaluationTask(
252            task_id="task_001",
253            agent_id="qa_agent",
254            input_data={"question": "What is the capital of France?"},
255            expected_output={"answer": "Paris is the capital of France."},
256            priority=1
257        ),
258        EvaluationTask(
259            task_id="task_002",
260            agent_id="qa_agent",
261            input_data={"question": "What is the largest ocean?"},
262            expected_output={"answer": "The largest ocean is the Pacific Ocean."},
263            priority=1
264        ),
265        EvaluationTask(
266            task_id="task_003",
267            agent_id="summarization_agent",
268            input_data={
269                "text": "AI systems are transforming industries. "
270                       "Machine learning enables new capabilities. "
271                       "The future holds great promise."
272            },
273            expected_output=None,
274            priority=0
275        )
276    ]
277
278    # Run pipeline directly with sample tasks
279    context = PipelineContext()
280
281    # Skip ingestion for demo - pass tasks directly to evaluation
282    results = await evaluation_engine.execute(sample_tasks, context)
283
284    # Process results
285    aggregated = await results_processor.execute(results, context)
286
287    # Generate reports
288    reports = await reporting.execute(aggregated, context)
289
290    print("\n" + "=" * 60)
291    print("EVALUATION COMPLETE")
292    print("=" * 60)
293    print(f"Run ID: {context.run_id}")
294    print(f"Total tasks: {len(sample_tasks)}")
295    print(f"Successful: {context.metadata.get('evaluation_stats', dict()).get('successful', 0)}")
296    print(f"Reports generated: {len(reports.get('reports', []))}")
297
298    for report in reports.get("reports", []):
299        print(f"  - {report['format']}: {report['path']}")
300
301    return orchestrator, app
302
303
304# Main entry point
305if __name__ == "__main__":
306    # Run the pipeline
307    orchestrator, app = asyncio.run(build_and_run_pipeline())
308
309    # Start the API server
310    # uvicorn.run(app, host="0.0.0.0", port=8000)

This complete implementation demonstrates assembling all pipeline components into a working system. The modular design allows easy customization and extension for specific evaluation requirements.

Production Checklist: Before deploying to production, ensure you have configured proper database connections, set up monitoring and alerting, implemented authentication for the API, and tested failover scenarios for each pipeline component.

Summary

In this capstone section, we built a comprehensive evaluation pipeline that integrates all the concepts from Chapter 20. The pipeline provides a production-ready foundation for assessing agent performance at scale.

Key Components Covered

  • Pipeline Architecture - Modular design with distinct stages for ingestion, evaluation, aggregation, reporting, and archival
  • Data Ingestion - Multi-source data loading with database, S3, and API integrations plus automatic deduplication
  • Evaluation Engine - Parallel execution with worker pools, retry logic, timeout handling, and composite evaluators
  • Results Processing - Statistical aggregation, anomaly detection, and baseline comparison for actionable insights
  • Reporting - Multi-format report generation with JSON, HTML, and Markdown outputs plus notification integration
  • Orchestration - Schedule management, event-triggered runs, and concurrent execution control
  • Deployment - REST API, containerization patterns, and Kubernetes deployment configurations

Best Practices Summary

PracticeBenefit
Modular component designEasy testing, maintenance, and scaling
Async processing throughoutHigh throughput and resource efficiency
Comprehensive error handlingGraceful degradation and debugging
Statistical aggregationMeaningful insights from raw data
Multi-format reportingFlexibility for different stakeholders
Event-driven architectureResponsive to production changes
Containerized deploymentConsistent environments and scaling

The evaluation pipeline serves as the backbone of a data-driven agent development process. By consistently measuring performance across multiple dimensions, teams can make informed decisions about architecture, optimization, and deployment strategies.

Chapter Complete: You've now completed Chapter 20 on Evaluation and Benchmarking. You understand how to define meaningful metrics, design comprehensive benchmarks, build testing frameworks, run experiments, implement continuous evaluation, and deploy production evaluation pipelines. These capabilities are essential for building agents that improve over time and maintain quality in production.