Chapter 20
18 min read
Section 125 of 175

Testing Frameworks

Evaluation and Benchmarking

Introduction

Testing frameworks provide the infrastructure for executing benchmarks efficiently and reliably. This section covers how to build a robust testing framework that handles parallel execution, test isolation, and comprehensive result collection.

Framework Goals: A good testing framework makes it easy to run benchmarks reproducibly, scale execution across resources, and collect detailed results for analysis.

Agent testing presents unique challenges compared to traditional software testing: tests may take minutes to complete, require external API calls, and produce non-deterministic outputs. Your framework must handle all of these.


Test Runner Architecture

The test runner coordinates benchmark execution, manages resources, and collects results. Here's a comprehensive implementation:

🐍python
1"""
2Test runner framework for agent benchmarks.
3
4This module provides the core infrastructure for executing
5benchmark tests against AI agents.
6"""
7
8from abc import ABC, abstractmethod
9from dataclasses import dataclass, field
10from datetime import datetime, timedelta
11from enum import Enum
12from typing import Any, Callable, Dict, List, Optional, TypeVar
13import asyncio
14import time
15import traceback
16
17
18class TestStatus(Enum):
19    """Status of a test execution."""
20    PENDING = "pending"
21    RUNNING = "running"
22    PASSED = "passed"
23    FAILED = "failed"
24    ERROR = "error"
25    TIMEOUT = "timeout"
26    SKIPPED = "skipped"
27
28
29@dataclass
30class TestResult:
31    """Result of a single test execution."""
32    test_id: str
33    status: TestStatus
34    score: float
35    start_time: datetime
36    end_time: datetime
37    output: Optional[Dict[str, Any]] = None
38    error: Optional[str] = None
39    metrics: Dict[str, float] = field(default_factory=dict)
40
41    @property
42    def duration_seconds(self) -> float:
43        return (self.end_time - self.start_time).total_seconds()
44
45    def to_dict(self) -> Dict[str, Any]:
46        return {
47            "test_id": self.test_id,
48            "status": self.status.value,
49            "score": self.score,
50            "duration_seconds": self.duration_seconds,
51            "start_time": self.start_time.isoformat(),
52            "end_time": self.end_time.isoformat(),
53            "output": self.output,
54            "error": self.error,
55            "metrics": self.metrics
56        }
57
58
59@dataclass
60class TestRunConfig:
61    """Configuration for a test run."""
62    max_parallel: int = 4
63    timeout_seconds: int = 300
64    retry_count: int = 0
65    fail_fast: bool = False
66    capture_output: bool = True
67    tags_filter: Optional[List[str]] = None
68    categories_filter: Optional[List[str]] = None
69    difficulty_filter: Optional[List[str]] = None
70
71
72class AgentAdapter(ABC):
73    """Abstract adapter for connecting to agents."""
74
75    @abstractmethod
76    async def execute(
77        self,
78        task: Dict[str, Any],
79        timeout: int
80    ) -> Dict[str, Any]:
81        """Execute a task and return the result."""
82        pass
83
84    @abstractmethod
85    async def reset(self):
86        """Reset agent state between tests."""
87        pass
88
89    @abstractmethod
90    def get_metrics(self) -> Dict[str, float]:
91        """Get execution metrics from the agent."""
92        pass
93
94
95class TestRunner:
96    """Main test runner for benchmark execution."""
97
98    def __init__(
99        self,
100        agent: AgentAdapter,
101        config: TestRunConfig,
102        evaluators: Dict[str, Callable] = None
103    ):
104        self.agent = agent
105        self.config = config
106        self.evaluators = evaluators or {}
107        self.results: List[TestResult] = []
108        self._running = False
109        self._cancelled = False
110
111    async def run(
112        self,
113        tasks: List[Dict[str, Any]]
114    ) -> List[TestResult]:
115        """Run all benchmark tasks."""
116
117        self._running = True
118        self._cancelled = False
119        self.results = []
120
121        # Filter tasks if configured
122        filtered_tasks = self._filter_tasks(tasks)
123
124        # Create semaphore for parallelism control
125        semaphore = asyncio.Semaphore(self.config.max_parallel)
126
127        # Run tasks
128        async def run_with_semaphore(task):
129            async with semaphore:
130                if self._cancelled:
131                    return self._create_skipped_result(task)
132                return await self._run_single_task(task)
133
134        # Execute all tasks
135        coros = [run_with_semaphore(task) for task in filtered_tasks]
136        self.results = await asyncio.gather(*coros)
137
138        self._running = False
139        return self.results
140
141    def cancel(self):
142        """Cancel the running test suite."""
143        self._cancelled = True
144
145    def _filter_tasks(
146        self,
147        tasks: List[Dict[str, Any]]
148    ) -> List[Dict[str, Any]]:
149        """Apply configured filters to tasks."""
150
151        filtered = tasks
152
153        if self.config.tags_filter:
154            tag_set = set(self.config.tags_filter)
155            filtered = [
156                t for t in filtered
157                if set(t.get("tags", [])) & tag_set
158            ]
159
160        if self.config.categories_filter:
161            cat_set = set(self.config.categories_filter)
162            filtered = [
163                t for t in filtered
164                if t.get("category") in cat_set
165            ]
166
167        if self.config.difficulty_filter:
168            diff_set = set(self.config.difficulty_filter)
169            filtered = [
170                t for t in filtered
171                if t.get("difficulty") in diff_set
172            ]
173
174        return filtered
175
176    async def _run_single_task(
177        self,
178        task: Dict[str, Any]
179    ) -> TestResult:
180        """Execute a single test task."""
181
182        task_id = task.get("id", "unknown")
183        start_time = datetime.utcnow()
184        timeout = task.get("timeout_seconds", self.config.timeout_seconds)
185
186        # Reset agent before each test
187        await self.agent.reset()
188
189        attempt = 0
190        last_error = None
191
192        while attempt <= self.config.retry_count:
193            try:
194                # Execute with timeout
195                output = await asyncio.wait_for(
196                    self.agent.execute(task.get("input_data", {}), timeout),
197                    timeout=timeout
198                )
199
200                # Evaluate result
201                score = self._evaluate(task, output)
202                status = TestStatus.PASSED if score >= 0.7 else TestStatus.FAILED
203
204                # Get metrics from agent
205                metrics = self.agent.get_metrics()
206
207                return TestResult(
208                    test_id=task_id,
209                    status=status,
210                    score=score,
211                    start_time=start_time,
212                    end_time=datetime.utcnow(),
213                    output=output if self.config.capture_output else None,
214                    metrics=metrics
215                )
216
217            except asyncio.TimeoutError:
218                last_error = f"Timeout after {timeout}s"
219                if attempt == self.config.retry_count:
220                    return TestResult(
221                        test_id=task_id,
222                        status=TestStatus.TIMEOUT,
223                        score=0.0,
224                        start_time=start_time,
225                        end_time=datetime.utcnow(),
226                        error=last_error
227                    )
228
229            except Exception as e:
230                last_error = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}"
231                if attempt == self.config.retry_count:
232                    if self.config.fail_fast:
233                        self.cancel()
234
235                    return TestResult(
236                        test_id=task_id,
237                        status=TestStatus.ERROR,
238                        score=0.0,
239                        start_time=start_time,
240                        end_time=datetime.utcnow(),
241                        error=last_error
242                    )
243
244            attempt += 1
245            await asyncio.sleep(1)  # Brief pause before retry
246
247        # Should not reach here
248        return self._create_error_result(task_id, start_time, last_error)
249
250    def _evaluate(
251        self,
252        task: Dict[str, Any],
253        output: Dict[str, Any]
254    ) -> float:
255        """Evaluate task output."""
256
257        category = task.get("category", "default")
258        evaluator = self.evaluators.get(category)
259
260        if evaluator:
261            return evaluator(task.get("input_data", {}), output)
262
263        # Default evaluation
264        expected = task.get("expected_output")
265        if expected:
266            return 1.0 if output == expected else 0.0
267
268        # No evaluation possible
269        return 0.5
270
271    def _create_skipped_result(
272        self,
273        task: Dict[str, Any]
274    ) -> TestResult:
275        """Create a skipped result."""
276        now = datetime.utcnow()
277        return TestResult(
278            test_id=task.get("id", "unknown"),
279            status=TestStatus.SKIPPED,
280            score=0.0,
281            start_time=now,
282            end_time=now
283        )
284
285    def _create_error_result(
286        self,
287        task_id: str,
288        start_time: datetime,
289        error: str
290    ) -> TestResult:
291        """Create an error result."""
292        return TestResult(
293            test_id=task_id,
294            status=TestStatus.ERROR,
295            score=0.0,
296            start_time=start_time,
297            end_time=datetime.utcnow(),
298            error=error
299        )

Parallel Execution

Parallel execution dramatically reduces benchmark runtime. Here's how to implement efficient parallelization while managing resources:

🐍python
1"""
2Parallel execution infrastructure for benchmarks.
3"""
4
5import asyncio
6from dataclasses import dataclass
7from typing import Any, Callable, Dict, List, Optional
8from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
9import multiprocessing
10
11
12@dataclass
13class WorkerConfig:
14    """Configuration for worker processes."""
15    worker_count: int = 4
16    use_processes: bool = False  # True for CPU-bound, False for IO-bound
17    max_queue_size: int = 100
18    worker_timeout: int = 60
19
20
21class WorkerPool:
22    """Pool of workers for parallel test execution."""
23
24    def __init__(self, config: WorkerConfig, agent_factory: Callable):
25        self.config = config
26        self.agent_factory = agent_factory
27        self._executor = None
28        self._agents: List[Any] = []
29
30    async def start(self):
31        """Initialize worker pool."""
32        if self.config.use_processes:
33            self._executor = ProcessPoolExecutor(
34                max_workers=self.config.worker_count
35            )
36        else:
37            self._executor = ThreadPoolExecutor(
38                max_workers=self.config.worker_count
39            )
40
41        # Pre-create agents for each worker
42        self._agents = [
43            await self.agent_factory()
44            for _ in range(self.config.worker_count)
45        ]
46
47    async def stop(self):
48        """Shutdown worker pool."""
49        if self._executor:
50            self._executor.shutdown(wait=True)
51
52    async def execute_batch(
53        self,
54        tasks: List[Dict[str, Any]],
55        evaluator: Callable
56    ) -> List[TestResult]:
57        """Execute a batch of tasks in parallel."""
58
59        # Distribute tasks across workers
60        batches = self._distribute_tasks(tasks)
61
62        # Create coroutines for each batch
63        async def run_batch(worker_id: int, batch: List[Dict[str, Any]]):
64            agent = self._agents[worker_id]
65            results = []
66
67            for task in batch:
68                result = await self._run_task(agent, task, evaluator)
69                results.append(result)
70
71            return results
72
73        # Run all batches concurrently
74        batch_coros = [
75            run_batch(i, batch)
76            for i, batch in enumerate(batches)
77        ]
78
79        batch_results = await asyncio.gather(*batch_coros)
80
81        # Flatten results
82        return [r for batch in batch_results for r in batch]
83
84    def _distribute_tasks(
85        self,
86        tasks: List[Dict[str, Any]]
87    ) -> List[List[Dict[str, Any]]]:
88        """Distribute tasks evenly across workers."""
89
90        batches: List[List[Dict[str, Any]]] = [
91            [] for _ in range(self.config.worker_count)
92        ]
93
94        for i, task in enumerate(tasks):
95            batches[i % self.config.worker_count].append(task)
96
97        return batches
98
99    async def _run_task(
100        self,
101        agent: Any,
102        task: Dict[str, Any],
103        evaluator: Callable
104    ) -> TestResult:
105        """Run a single task on a specific agent."""
106
107        task_id = task.get("id", "unknown")
108        start_time = datetime.utcnow()
109        timeout = task.get("timeout_seconds", 300)
110
111        try:
112            await agent.reset()
113
114            output = await asyncio.wait_for(
115                agent.execute(task.get("input_data", {}), timeout),
116                timeout=timeout
117            )
118
119            score = evaluator(task.get("input_data", {}), output)
120
121            return TestResult(
122                test_id=task_id,
123                status=TestStatus.PASSED if score >= 0.7 else TestStatus.FAILED,
124                score=score,
125                start_time=start_time,
126                end_time=datetime.utcnow(),
127                output=output,
128                metrics=agent.get_metrics()
129            )
130
131        except asyncio.TimeoutError:
132            return TestResult(
133                test_id=task_id,
134                status=TestStatus.TIMEOUT,
135                score=0.0,
136                start_time=start_time,
137                end_time=datetime.utcnow(),
138                error=f"Timeout after {timeout}s"
139            )
140
141        except Exception as e:
142            return TestResult(
143                test_id=task_id,
144                status=TestStatus.ERROR,
145                score=0.0,
146                start_time=start_time,
147                end_time=datetime.utcnow(),
148                error=str(e)
149            )
150
151
152class DistributedRunner:
153    """Run benchmarks across multiple machines."""
154
155    def __init__(
156        self,
157        coordinator_url: str,
158        worker_id: str
159    ):
160        self.coordinator_url = coordinator_url
161        self.worker_id = worker_id
162
163    async def register(self):
164        """Register this worker with the coordinator."""
165        # In production, use actual HTTP/gRPC calls
166        pass
167
168    async def fetch_tasks(self) -> List[Dict[str, Any]]:
169        """Fetch assigned tasks from coordinator."""
170        pass
171
172    async def submit_results(self, results: List[TestResult]):
173        """Submit results back to coordinator."""
174        pass
175
176    async def run(self, agent: AgentAdapter, evaluator: Callable):
177        """Main worker loop."""
178
179        await self.register()
180
181        while True:
182            tasks = await self.fetch_tasks()
183
184            if not tasks:
185                await asyncio.sleep(5)
186                continue
187
188            runner = TestRunner(
189                agent=agent,
190                config=TestRunConfig(max_parallel=2)
191            )
192
193            results = await runner.run(tasks)
194            await self.submit_results(results)
195
196
197class LoadBalancer:
198    """Balances task distribution across workers."""
199
200    def __init__(self, workers: List[str]):
201        self.workers = workers
202        self.worker_loads: Dict[str, int] = {w: 0 for w in workers}
203        self.worker_performance: Dict[str, float] = {w: 1.0 for w in workers}
204
205    def assign_task(self, task: Dict[str, Any]) -> str:
206        """Assign a task to the best available worker."""
207
208        # Calculate weighted load (lower is better)
209        weighted_loads = {
210            w: self.worker_loads[w] / self.worker_performance[w]
211            for w in self.workers
212        }
213
214        # Select worker with lowest weighted load
215        best_worker = min(weighted_loads, key=weighted_loads.get)
216        self.worker_loads[best_worker] += 1
217
218        return best_worker
219
220    def complete_task(self, worker: str, duration: float):
221        """Record task completion."""
222        self.worker_loads[worker] = max(0, self.worker_loads[worker] - 1)
223
224        # Update performance estimate (exponential moving average)
225        alpha = 0.1
226        self.worker_performance[worker] = (
227            alpha * (1.0 / max(duration, 0.1)) +
228            (1 - alpha) * self.worker_performance[worker]
229        )

Test Isolation

Test isolation ensures that tests don't affect each other. This is crucial for reliable benchmarking:

🐍python
1"""
2Test isolation mechanisms.
3"""
4
5from abc import ABC, abstractmethod
6from contextlib import asynccontextmanager
7from dataclasses import dataclass
8from typing import Any, Dict, Optional
9import copy
10import os
11import tempfile
12
13
14class IsolationStrategy(ABC):
15    """Abstract base for isolation strategies."""
16
17    @abstractmethod
18    async def setup(self):
19        """Set up isolated environment."""
20        pass
21
22    @abstractmethod
23    async def teardown(self):
24        """Clean up isolated environment."""
25        pass
26
27    @abstractmethod
28    async def get_context(self) -> Dict[str, Any]:
29        """Get the isolated context."""
30        pass
31
32
33class StateIsolation(IsolationStrategy):
34    """Isolates agent state between tests."""
35
36    def __init__(self, agent: AgentAdapter):
37        self.agent = agent
38        self._original_state: Optional[Dict[str, Any]] = None
39
40    async def setup(self):
41        """Capture and reset agent state."""
42        # Capture original state
43        self._original_state = await self.agent.get_state()
44
45        # Reset to clean state
46        await self.agent.reset()
47
48    async def teardown(self):
49        """Restore original state."""
50        if self._original_state:
51            await self.agent.set_state(self._original_state)
52
53    async def get_context(self) -> Dict[str, Any]:
54        return {"state_isolated": True}
55
56
57class EnvironmentIsolation(IsolationStrategy):
58    """Isolates environment variables."""
59
60    def __init__(self, env_overrides: Dict[str, str] = None):
61        self.env_overrides = env_overrides or {}
62        self._original_env: Dict[str, Optional[str]] = {}
63
64    async def setup(self):
65        """Set isolated environment variables."""
66        for key, value in self.env_overrides.items():
67            self._original_env[key] = os.environ.get(key)
68            os.environ[key] = value
69
70    async def teardown(self):
71        """Restore original environment."""
72        for key, original in self._original_env.items():
73            if original is None:
74                os.environ.pop(key, None)
75            else:
76                os.environ[key] = original
77
78    async def get_context(self) -> Dict[str, Any]:
79        return {"env_isolated": True, "overrides": list(self.env_overrides.keys())}
80
81
82class FileSystemIsolation(IsolationStrategy):
83    """Isolates file system access."""
84
85    def __init__(self):
86        self._temp_dir: Optional[str] = None
87        self._original_cwd: Optional[str] = None
88
89    async def setup(self):
90        """Create temporary directory."""
91        self._temp_dir = tempfile.mkdtemp(prefix="agent_test_")
92        self._original_cwd = os.getcwd()
93        os.chdir(self._temp_dir)
94
95    async def teardown(self):
96        """Clean up temporary directory."""
97        if self._original_cwd:
98            os.chdir(self._original_cwd)
99
100        if self._temp_dir:
101            import shutil
102            shutil.rmtree(self._temp_dir, ignore_errors=True)
103
104    async def get_context(self) -> Dict[str, Any]:
105        return {"fs_isolated": True, "temp_dir": self._temp_dir}
106
107
108class NetworkIsolation(IsolationStrategy):
109    """Isolates network access with mocking."""
110
111    def __init__(self, mock_responses: Dict[str, Any] = None):
112        self.mock_responses = mock_responses or {}
113        self._original_fetch = None
114
115    async def setup(self):
116        """Set up network mocking."""
117        # In production, use libraries like responses or aioresponses
118        pass
119
120    async def teardown(self):
121        """Remove network mocking."""
122        pass
123
124    async def get_context(self) -> Dict[str, Any]:
125        return {"network_isolated": True, "mocked_urls": list(self.mock_responses.keys())}
126
127
128class CompositeIsolation(IsolationStrategy):
129    """Combines multiple isolation strategies."""
130
131    def __init__(self, strategies: List[IsolationStrategy]):
132        self.strategies = strategies
133
134    async def setup(self):
135        """Set up all isolation strategies."""
136        for strategy in self.strategies:
137            await strategy.setup()
138
139    async def teardown(self):
140        """Tear down all strategies in reverse order."""
141        for strategy in reversed(self.strategies):
142            await strategy.teardown()
143
144    async def get_context(self) -> Dict[str, Any]:
145        context = {}
146        for strategy in self.strategies:
147            context.update(await strategy.get_context())
148        return context
149
150
151@asynccontextmanager
152async def isolated_test(
153    agent: AgentAdapter,
154    isolation_strategies: List[IsolationStrategy] = None
155):
156    """Context manager for running isolated tests."""
157
158    if isolation_strategies is None:
159        isolation_strategies = [StateIsolation(agent)]
160
161    composite = CompositeIsolation(isolation_strategies)
162
163    try:
164        await composite.setup()
165        context = await composite.get_context()
166        yield context
167    finally:
168        await composite.teardown()
169
170
171class TestSandbox:
172    """Sandboxed environment for test execution."""
173
174    def __init__(
175        self,
176        agent: AgentAdapter,
177        isolations: List[IsolationStrategy] = None
178    ):
179        self.agent = agent
180        self.isolations = isolations or []
181        self._active = False
182
183    async def __aenter__(self):
184        """Enter the sandbox."""
185        for isolation in self.isolations:
186            await isolation.setup()
187        self._active = True
188        return self
189
190    async def __aexit__(self, exc_type, exc_val, exc_tb):
191        """Exit the sandbox."""
192        for isolation in reversed(self.isolations):
193            try:
194                await isolation.teardown()
195            except Exception:
196                pass  # Don't fail on cleanup errors
197        self._active = False
198
199    async def run_test(
200        self,
201        task: Dict[str, Any],
202        evaluator: Callable
203    ) -> TestResult:
204        """Run a test within the sandbox."""
205
206        if not self._active:
207            raise RuntimeError("Sandbox not active")
208
209        start_time = datetime.utcnow()
210        task_id = task.get("id", "unknown")
211
212        try:
213            await self.agent.reset()
214
215            output = await self.agent.execute(
216                task.get("input_data", {}),
217                task.get("timeout_seconds", 300)
218            )
219
220            score = evaluator(task.get("input_data", {}), output)
221
222            return TestResult(
223                test_id=task_id,
224                status=TestStatus.PASSED if score >= 0.7 else TestStatus.FAILED,
225                score=score,
226                start_time=start_time,
227                end_time=datetime.utcnow(),
228                output=output
229            )
230
231        except Exception as e:
232            return TestResult(
233                test_id=task_id,
234                status=TestStatus.ERROR,
235                score=0.0,
236                start_time=start_time,
237                end_time=datetime.utcnow(),
238                error=str(e)
239            )

Result Collection

Comprehensive result collection enables deep analysis and debugging. Here's how to build a robust result collection system:

🐍python
1"""
2Result collection and reporting.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime
7from pathlib import Path
8from typing import Any, Dict, List, Optional
9import json
10import statistics
11
12
13@dataclass
14class TestSuiteResult:
15    """Complete result of a test suite run."""
16    suite_name: str
17    agent_id: str
18    run_id: str
19    start_time: datetime
20    end_time: datetime
21    results: List[TestResult]
22    config: Dict[str, Any] = field(default_factory=dict)
23    metadata: Dict[str, Any] = field(default_factory=dict)
24
25    @property
26    def duration_seconds(self) -> float:
27        return (self.end_time - self.start_time).total_seconds()
28
29    @property
30    def pass_count(self) -> int:
31        return sum(1 for r in self.results if r.status == TestStatus.PASSED)
32
33    @property
34    def fail_count(self) -> int:
35        return sum(1 for r in self.results if r.status == TestStatus.FAILED)
36
37    @property
38    def error_count(self) -> int:
39        return sum(1 for r in self.results if r.status == TestStatus.ERROR)
40
41    @property
42    def timeout_count(self) -> int:
43        return sum(1 for r in self.results if r.status == TestStatus.TIMEOUT)
44
45    @property
46    def pass_rate(self) -> float:
47        total = len(self.results)
48        return self.pass_count / total if total > 0 else 0.0
49
50    @property
51    def average_score(self) -> float:
52        scores = [r.score for r in self.results]
53        return statistics.mean(scores) if scores else 0.0
54
55    @property
56    def average_duration(self) -> float:
57        durations = [r.duration_seconds for r in self.results]
58        return statistics.mean(durations) if durations else 0.0
59
60    def get_summary(self) -> Dict[str, Any]:
61        """Get summary statistics."""
62        return {
63            "suite_name": self.suite_name,
64            "agent_id": self.agent_id,
65            "run_id": self.run_id,
66            "total_tests": len(self.results),
67            "passed": self.pass_count,
68            "failed": self.fail_count,
69            "errors": self.error_count,
70            "timeouts": self.timeout_count,
71            "pass_rate": round(self.pass_rate, 4),
72            "average_score": round(self.average_score, 4),
73            "average_duration": round(self.average_duration, 2),
74            "total_duration": round(self.duration_seconds, 2)
75        }
76
77    def get_failed_tests(self) -> List[TestResult]:
78        """Get all failed tests."""
79        return [
80            r for r in self.results
81            if r.status in (TestStatus.FAILED, TestStatus.ERROR, TestStatus.TIMEOUT)
82        ]
83
84    def to_dict(self) -> Dict[str, Any]:
85        return {
86            "summary": self.get_summary(),
87            "results": [r.to_dict() for r in self.results],
88            "config": self.config,
89            "metadata": self.metadata
90        }
91
92
93class ResultCollector:
94    """Collects and aggregates test results."""
95
96    def __init__(self, output_dir: Path = None):
97        self.output_dir = output_dir or Path("test_results")
98        self.results: List[TestResult] = []
99        self.start_time: Optional[datetime] = None
100        self._lock = asyncio.Lock()
101
102    async def start_run(self, suite_name: str, agent_id: str) -> str:
103        """Start a new test run."""
104        self.start_time = datetime.utcnow()
105        self.results = []
106
107        run_id = f"{suite_name}_{agent_id}_{self.start_time.strftime('%Y%m%d_%H%M%S')}"
108
109        # Create output directory
110        run_dir = self.output_dir / run_id
111        run_dir.mkdir(parents=True, exist_ok=True)
112
113        return run_id
114
115    async def record_result(self, result: TestResult):
116        """Record a single test result."""
117        async with self._lock:
118            self.results.append(result)
119
120            # Write incremental result
121            if self.output_dir:
122                result_file = self.output_dir / "current" / f"{result.test_id}.json"
123                result_file.parent.mkdir(parents=True, exist_ok=True)
124                with open(result_file, "w") as f:
125                    json.dump(result.to_dict(), f, indent=2)
126
127    async def finish_run(
128        self,
129        run_id: str,
130        suite_name: str,
131        agent_id: str,
132        config: Dict[str, Any] = None,
133        metadata: Dict[str, Any] = None
134    ) -> TestSuiteResult:
135        """Finish the test run and generate final report."""
136
137        suite_result = TestSuiteResult(
138            suite_name=suite_name,
139            agent_id=agent_id,
140            run_id=run_id,
141            start_time=self.start_time or datetime.utcnow(),
142            end_time=datetime.utcnow(),
143            results=self.results.copy(),
144            config=config or {},
145            metadata=metadata or {}
146        )
147
148        # Save full results
149        if self.output_dir:
150            run_dir = self.output_dir / run_id
151            run_dir.mkdir(parents=True, exist_ok=True)
152
153            with open(run_dir / "results.json", "w") as f:
154                json.dump(suite_result.to_dict(), f, indent=2)
155
156            # Save summary
157            with open(run_dir / "summary.json", "w") as f:
158                json.dump(suite_result.get_summary(), f, indent=2)
159
160        return suite_result
161
162
163class ResultReporter:
164    """Generates reports from test results."""
165
166    def __init__(self, suite_result: TestSuiteResult):
167        self.result = suite_result
168
169    def generate_markdown(self) -> str:
170        """Generate markdown report."""
171
172        summary = self.result.get_summary()
173
174        md = f"""# Test Results: {self.result.suite_name}
175
176## Summary
177
178| Metric | Value |
179|--------|-------|
180| Agent | {summary['agent_id']} |
181| Total Tests | {summary['total_tests']} |
182| Passed | {summary['passed']} |
183| Failed | {summary['failed']} |
184| Errors | {summary['errors']} |
185| Pass Rate | {summary['pass_rate']*100:.1f}% |
186| Average Score | {summary['average_score']:.3f} |
187| Total Duration | {summary['total_duration']:.1f}s |
188
189## Failed Tests
190
191"""
192        failed = self.result.get_failed_tests()
193
194        if failed:
195            for test in failed:
196                md += f"### {test.test_id}\n"
197                md += f"- Status: {test.status.value}\n"
198                md += f"- Score: {test.score:.3f}\n"
199                if test.error:
200                    md += f"- Error: {test.error[:200]}\n"
201                md += "\n"
202        else:
203            md += "No failed tests!\n"
204
205        return md
206
207    def generate_junit_xml(self) -> str:
208        """Generate JUnit XML for CI integration."""
209
210        xml_parts = [
211            '<?xml version="1.0" encoding="UTF-8"?>',
212            f'<testsuite name="{self.result.suite_name}" '
213            f'tests="{len(self.result.results)}" '
214            f'failures="{self.result.fail_count}" '
215            f'errors="{self.result.error_count}" '
216            f'time="{self.result.duration_seconds:.3f}">'
217        ]
218
219        for test in self.result.results:
220            xml_parts.append(
221                f'  <testcase name="{test.test_id}" '
222                f'time="{test.duration_seconds:.3f}">'
223            )
224
225            if test.status == TestStatus.FAILED:
226                xml_parts.append(
227                    f'    <failure message="Score: {test.score:.3f}"/>'
228                )
229            elif test.status == TestStatus.ERROR:
230                xml_parts.append(
231                    f'    <error message="{test.error or "Unknown error"}"/>'
232                )
233            elif test.status == TestStatus.TIMEOUT:
234                xml_parts.append(
235                    '    <error message="Test timed out"/>'
236                )
237
238            xml_parts.append('  </testcase>')
239
240        xml_parts.append('</testsuite>')
241
242        return '\n'.join(xml_parts)

Testing Patterns

Common testing patterns help structure your benchmarks effectively:

🐍python
1"""
2Common testing patterns for agent benchmarks.
3"""
4
5from typing import Any, Callable, Dict, List
6
7
8# Pattern 1: Parameterized Testing
9class ParameterizedTest:
10    """Run the same test with different parameters."""
11
12    def __init__(
13        self,
14        base_task: Dict[str, Any],
15        parameter_sets: List[Dict[str, Any]]
16    ):
17        self.base_task = base_task
18        self.parameter_sets = parameter_sets
19
20    def generate_tasks(self) -> List[Dict[str, Any]]:
21        """Generate tasks for all parameter combinations."""
22        tasks = []
23
24        for i, params in enumerate(self.parameter_sets):
25            task = {**self.base_task}
26            task["id"] = f"{self.base_task.get('id', 'test')}_{i}"
27            task["input_data"] = {
28                **self.base_task.get("input_data", {}),
29                **params
30            }
31            tasks.append(task)
32
33        return tasks
34
35
36# Pattern 2: Golden Test
37class GoldenTest:
38    """Compare output against known-good reference."""
39
40    def __init__(
41        self,
42        task: Dict[str, Any],
43        golden_output: Dict[str, Any],
44        comparison_fn: Callable = None
45    ):
46        self.task = task
47        self.golden_output = golden_output
48        self.comparison_fn = comparison_fn or self._default_compare
49
50    def evaluate(self, output: Dict[str, Any]) -> float:
51        """Compare output to golden reference."""
52        return self.comparison_fn(output, self.golden_output)
53
54    def _default_compare(self, output: Dict, golden: Dict) -> float:
55        """Default comparison function."""
56        matches = 0
57        total = 0
58
59        for key in golden:
60            total += 1
61            if key in output and output[key] == golden[key]:
62                matches += 1
63
64        return matches / total if total > 0 else 0.0
65
66
67# Pattern 3: Regression Test
68class RegressionTest:
69    """Ensure previously working functionality still works."""
70
71    def __init__(self, known_issues: List[Dict[str, Any]]):
72        self.known_issues = known_issues
73
74    def generate_tasks(self) -> List[Dict[str, Any]]:
75        """Generate regression test tasks."""
76        return [
77            {
78                "id": f"regression_{issue['id']}",
79                "name": f"Regression: {issue['title']}",
80                "category": "regression",
81                "difficulty": "medium",
82                "input_data": issue["input"],
83                "expected_output": issue["expected_output"],
84                "metadata": {
85                    "original_issue": issue["id"],
86                    "fixed_date": issue.get("fixed_date")
87                }
88            }
89            for issue in self.known_issues
90        ]
91
92
93# Pattern 4: Fuzzing Test
94class FuzzingTest:
95    """Test with randomly generated inputs."""
96
97    def __init__(
98        self,
99        input_generator: Callable[[], Dict[str, Any]],
100        validator: Callable[[Dict[str, Any]], bool],
101        count: int = 100
102    ):
103        self.input_generator = input_generator
104        self.validator = validator
105        self.count = count
106
107    def generate_tasks(self, seed: int = 42) -> List[Dict[str, Any]]:
108        """Generate fuzz test tasks."""
109        import random
110        random.seed(seed)
111
112        return [
113            {
114                "id": f"fuzz_{i}",
115                "name": f"Fuzz test {i}",
116                "category": "edge_case",
117                "difficulty": "hard",
118                "input_data": self.input_generator(),
119                "evaluation_criteria": {"validator": "fuzz_validator"}
120            }
121            for i in range(self.count)
122        ]
123
124
125# Pattern 5: Property-Based Test
126class PropertyTest:
127    """Test that outputs satisfy certain properties."""
128
129    def __init__(
130        self,
131        properties: List[Callable[[Dict[str, Any], Dict[str, Any]], bool]],
132        property_names: List[str] = None
133    ):
134        self.properties = properties
135        self.property_names = property_names or [
136            f"property_{i}" for i in range(len(properties))
137        ]
138
139    def evaluate(
140        self,
141        input_data: Dict[str, Any],
142        output: Dict[str, Any]
143    ) -> Dict[str, bool]:
144        """Check all properties."""
145        return {
146            name: prop(input_data, output)
147            for name, prop in zip(self.property_names, self.properties)
148        }
149
150    def get_score(
151        self,
152        input_data: Dict[str, Any],
153        output: Dict[str, Any]
154    ) -> float:
155        """Get score based on properties satisfied."""
156        results = self.evaluate(input_data, output)
157        return sum(results.values()) / len(results)
158
159
160# Pattern 6: Snapshot Test
161class SnapshotTest:
162    """Compare output against stored snapshots."""
163
164    def __init__(self, snapshot_dir: Path):
165        self.snapshot_dir = snapshot_dir
166
167    def update_snapshot(self, test_id: str, output: Dict[str, Any]):
168        """Update the snapshot for a test."""
169        snapshot_file = self.snapshot_dir / f"{test_id}.json"
170        snapshot_file.parent.mkdir(parents=True, exist_ok=True)
171
172        with open(snapshot_file, "w") as f:
173            json.dump(output, f, indent=2, sort_keys=True)
174
175    def compare_snapshot(
176        self,
177        test_id: str,
178        output: Dict[str, Any]
179    ) -> Tuple[bool, Optional[Dict[str, Any]]]:
180        """Compare output to snapshot."""
181        snapshot_file = self.snapshot_dir / f"{test_id}.json"
182
183        if not snapshot_file.exists():
184            return False, {"error": "No snapshot exists"}
185
186        with open(snapshot_file) as f:
187            snapshot = json.load(f)
188
189        if output == snapshot:
190            return True, None
191
192        # Calculate diff
193        diff = self._diff(snapshot, output)
194        return False, diff
195
196    def _diff(
197        self,
198        expected: Dict[str, Any],
199        actual: Dict[str, Any]
200    ) -> Dict[str, Any]:
201        """Calculate difference between expected and actual."""
202        diff = {
203            "added": {},
204            "removed": {},
205            "changed": {}
206        }
207
208        all_keys = set(expected.keys()) | set(actual.keys())
209
210        for key in all_keys:
211            if key not in expected:
212                diff["added"][key] = actual[key]
213            elif key not in actual:
214                diff["removed"][key] = expected[key]
215            elif expected[key] != actual[key]:
216                diff["changed"][key] = {
217                    "expected": expected[key],
218                    "actual": actual[key]
219                }
220
221        return diff

Summary

This section covered the essential components of a testing framework for AI agents:

ComponentPurposeKey Features
Test RunnerExecute benchmarksTimeout handling, retries, filtering
Parallel ExecutionScale testingWorker pools, load balancing
Test IsolationEnsure independenceState, environment, filesystem isolation
Result CollectionGather dataIncremental recording, reports
Testing PatternsStructure testsParameterized, golden, fuzzing
Key Takeaways: A robust testing framework handles parallel execution, ensures test isolation, collects comprehensive results, and supports common testing patterns. Invest in your testing infrastructure early—it pays dividends as your benchmark suite grows.

In the next section, we'll explore A/B testing and experimentation for comparing agent versions and configurations.