Introduction
Testing frameworks provide the infrastructure for executing benchmarks efficiently and reliably. This section covers how to build a robust testing framework that handles parallel execution, test isolation, and comprehensive result collection.
Framework Goals: A good testing framework makes it easy to run benchmarks reproducibly, scale execution across resources, and collect detailed results for analysis.
Agent testing presents unique challenges compared to traditional software testing: tests may take minutes to complete, require external API calls, and produce non-deterministic outputs. Your framework must handle all of these.
Test Runner Architecture
The test runner coordinates benchmark execution, manages resources, and collects results. Here's a comprehensive implementation:
1"""
2Test runner framework for agent benchmarks.
3
4This module provides the core infrastructure for executing
5benchmark tests against AI agents.
6"""
7
8from abc import ABC, abstractmethod
9from dataclasses import dataclass, field
10from datetime import datetime, timedelta
11from enum import Enum
12from typing import Any, Callable, Dict, List, Optional, TypeVar
13import asyncio
14import time
15import traceback
16
17
18class TestStatus(Enum):
19 """Status of a test execution."""
20 PENDING = "pending"
21 RUNNING = "running"
22 PASSED = "passed"
23 FAILED = "failed"
24 ERROR = "error"
25 TIMEOUT = "timeout"
26 SKIPPED = "skipped"
27
28
29@dataclass
30class TestResult:
31 """Result of a single test execution."""
32 test_id: str
33 status: TestStatus
34 score: float
35 start_time: datetime
36 end_time: datetime
37 output: Optional[Dict[str, Any]] = None
38 error: Optional[str] = None
39 metrics: Dict[str, float] = field(default_factory=dict)
40
41 @property
42 def duration_seconds(self) -> float:
43 return (self.end_time - self.start_time).total_seconds()
44
45 def to_dict(self) -> Dict[str, Any]:
46 return {
47 "test_id": self.test_id,
48 "status": self.status.value,
49 "score": self.score,
50 "duration_seconds": self.duration_seconds,
51 "start_time": self.start_time.isoformat(),
52 "end_time": self.end_time.isoformat(),
53 "output": self.output,
54 "error": self.error,
55 "metrics": self.metrics
56 }
57
58
59@dataclass
60class TestRunConfig:
61 """Configuration for a test run."""
62 max_parallel: int = 4
63 timeout_seconds: int = 300
64 retry_count: int = 0
65 fail_fast: bool = False
66 capture_output: bool = True
67 tags_filter: Optional[List[str]] = None
68 categories_filter: Optional[List[str]] = None
69 difficulty_filter: Optional[List[str]] = None
70
71
72class AgentAdapter(ABC):
73 """Abstract adapter for connecting to agents."""
74
75 @abstractmethod
76 async def execute(
77 self,
78 task: Dict[str, Any],
79 timeout: int
80 ) -> Dict[str, Any]:
81 """Execute a task and return the result."""
82 pass
83
84 @abstractmethod
85 async def reset(self):
86 """Reset agent state between tests."""
87 pass
88
89 @abstractmethod
90 def get_metrics(self) -> Dict[str, float]:
91 """Get execution metrics from the agent."""
92 pass
93
94
95class TestRunner:
96 """Main test runner for benchmark execution."""
97
98 def __init__(
99 self,
100 agent: AgentAdapter,
101 config: TestRunConfig,
102 evaluators: Dict[str, Callable] = None
103 ):
104 self.agent = agent
105 self.config = config
106 self.evaluators = evaluators or {}
107 self.results: List[TestResult] = []
108 self._running = False
109 self._cancelled = False
110
111 async def run(
112 self,
113 tasks: List[Dict[str, Any]]
114 ) -> List[TestResult]:
115 """Run all benchmark tasks."""
116
117 self._running = True
118 self._cancelled = False
119 self.results = []
120
121 # Filter tasks if configured
122 filtered_tasks = self._filter_tasks(tasks)
123
124 # Create semaphore for parallelism control
125 semaphore = asyncio.Semaphore(self.config.max_parallel)
126
127 # Run tasks
128 async def run_with_semaphore(task):
129 async with semaphore:
130 if self._cancelled:
131 return self._create_skipped_result(task)
132 return await self._run_single_task(task)
133
134 # Execute all tasks
135 coros = [run_with_semaphore(task) for task in filtered_tasks]
136 self.results = await asyncio.gather(*coros)
137
138 self._running = False
139 return self.results
140
141 def cancel(self):
142 """Cancel the running test suite."""
143 self._cancelled = True
144
145 def _filter_tasks(
146 self,
147 tasks: List[Dict[str, Any]]
148 ) -> List[Dict[str, Any]]:
149 """Apply configured filters to tasks."""
150
151 filtered = tasks
152
153 if self.config.tags_filter:
154 tag_set = set(self.config.tags_filter)
155 filtered = [
156 t for t in filtered
157 if set(t.get("tags", [])) & tag_set
158 ]
159
160 if self.config.categories_filter:
161 cat_set = set(self.config.categories_filter)
162 filtered = [
163 t for t in filtered
164 if t.get("category") in cat_set
165 ]
166
167 if self.config.difficulty_filter:
168 diff_set = set(self.config.difficulty_filter)
169 filtered = [
170 t for t in filtered
171 if t.get("difficulty") in diff_set
172 ]
173
174 return filtered
175
176 async def _run_single_task(
177 self,
178 task: Dict[str, Any]
179 ) -> TestResult:
180 """Execute a single test task."""
181
182 task_id = task.get("id", "unknown")
183 start_time = datetime.utcnow()
184 timeout = task.get("timeout_seconds", self.config.timeout_seconds)
185
186 # Reset agent before each test
187 await self.agent.reset()
188
189 attempt = 0
190 last_error = None
191
192 while attempt <= self.config.retry_count:
193 try:
194 # Execute with timeout
195 output = await asyncio.wait_for(
196 self.agent.execute(task.get("input_data", {}), timeout),
197 timeout=timeout
198 )
199
200 # Evaluate result
201 score = self._evaluate(task, output)
202 status = TestStatus.PASSED if score >= 0.7 else TestStatus.FAILED
203
204 # Get metrics from agent
205 metrics = self.agent.get_metrics()
206
207 return TestResult(
208 test_id=task_id,
209 status=status,
210 score=score,
211 start_time=start_time,
212 end_time=datetime.utcnow(),
213 output=output if self.config.capture_output else None,
214 metrics=metrics
215 )
216
217 except asyncio.TimeoutError:
218 last_error = f"Timeout after {timeout}s"
219 if attempt == self.config.retry_count:
220 return TestResult(
221 test_id=task_id,
222 status=TestStatus.TIMEOUT,
223 score=0.0,
224 start_time=start_time,
225 end_time=datetime.utcnow(),
226 error=last_error
227 )
228
229 except Exception as e:
230 last_error = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()}"
231 if attempt == self.config.retry_count:
232 if self.config.fail_fast:
233 self.cancel()
234
235 return TestResult(
236 test_id=task_id,
237 status=TestStatus.ERROR,
238 score=0.0,
239 start_time=start_time,
240 end_time=datetime.utcnow(),
241 error=last_error
242 )
243
244 attempt += 1
245 await asyncio.sleep(1) # Brief pause before retry
246
247 # Should not reach here
248 return self._create_error_result(task_id, start_time, last_error)
249
250 def _evaluate(
251 self,
252 task: Dict[str, Any],
253 output: Dict[str, Any]
254 ) -> float:
255 """Evaluate task output."""
256
257 category = task.get("category", "default")
258 evaluator = self.evaluators.get(category)
259
260 if evaluator:
261 return evaluator(task.get("input_data", {}), output)
262
263 # Default evaluation
264 expected = task.get("expected_output")
265 if expected:
266 return 1.0 if output == expected else 0.0
267
268 # No evaluation possible
269 return 0.5
270
271 def _create_skipped_result(
272 self,
273 task: Dict[str, Any]
274 ) -> TestResult:
275 """Create a skipped result."""
276 now = datetime.utcnow()
277 return TestResult(
278 test_id=task.get("id", "unknown"),
279 status=TestStatus.SKIPPED,
280 score=0.0,
281 start_time=now,
282 end_time=now
283 )
284
285 def _create_error_result(
286 self,
287 task_id: str,
288 start_time: datetime,
289 error: str
290 ) -> TestResult:
291 """Create an error result."""
292 return TestResult(
293 test_id=task_id,
294 status=TestStatus.ERROR,
295 score=0.0,
296 start_time=start_time,
297 end_time=datetime.utcnow(),
298 error=error
299 )Parallel Execution
Parallel execution dramatically reduces benchmark runtime. Here's how to implement efficient parallelization while managing resources:
1"""
2Parallel execution infrastructure for benchmarks.
3"""
4
5import asyncio
6from dataclasses import dataclass
7from typing import Any, Callable, Dict, List, Optional
8from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
9import multiprocessing
10
11
12@dataclass
13class WorkerConfig:
14 """Configuration for worker processes."""
15 worker_count: int = 4
16 use_processes: bool = False # True for CPU-bound, False for IO-bound
17 max_queue_size: int = 100
18 worker_timeout: int = 60
19
20
21class WorkerPool:
22 """Pool of workers for parallel test execution."""
23
24 def __init__(self, config: WorkerConfig, agent_factory: Callable):
25 self.config = config
26 self.agent_factory = agent_factory
27 self._executor = None
28 self._agents: List[Any] = []
29
30 async def start(self):
31 """Initialize worker pool."""
32 if self.config.use_processes:
33 self._executor = ProcessPoolExecutor(
34 max_workers=self.config.worker_count
35 )
36 else:
37 self._executor = ThreadPoolExecutor(
38 max_workers=self.config.worker_count
39 )
40
41 # Pre-create agents for each worker
42 self._agents = [
43 await self.agent_factory()
44 for _ in range(self.config.worker_count)
45 ]
46
47 async def stop(self):
48 """Shutdown worker pool."""
49 if self._executor:
50 self._executor.shutdown(wait=True)
51
52 async def execute_batch(
53 self,
54 tasks: List[Dict[str, Any]],
55 evaluator: Callable
56 ) -> List[TestResult]:
57 """Execute a batch of tasks in parallel."""
58
59 # Distribute tasks across workers
60 batches = self._distribute_tasks(tasks)
61
62 # Create coroutines for each batch
63 async def run_batch(worker_id: int, batch: List[Dict[str, Any]]):
64 agent = self._agents[worker_id]
65 results = []
66
67 for task in batch:
68 result = await self._run_task(agent, task, evaluator)
69 results.append(result)
70
71 return results
72
73 # Run all batches concurrently
74 batch_coros = [
75 run_batch(i, batch)
76 for i, batch in enumerate(batches)
77 ]
78
79 batch_results = await asyncio.gather(*batch_coros)
80
81 # Flatten results
82 return [r for batch in batch_results for r in batch]
83
84 def _distribute_tasks(
85 self,
86 tasks: List[Dict[str, Any]]
87 ) -> List[List[Dict[str, Any]]]:
88 """Distribute tasks evenly across workers."""
89
90 batches: List[List[Dict[str, Any]]] = [
91 [] for _ in range(self.config.worker_count)
92 ]
93
94 for i, task in enumerate(tasks):
95 batches[i % self.config.worker_count].append(task)
96
97 return batches
98
99 async def _run_task(
100 self,
101 agent: Any,
102 task: Dict[str, Any],
103 evaluator: Callable
104 ) -> TestResult:
105 """Run a single task on a specific agent."""
106
107 task_id = task.get("id", "unknown")
108 start_time = datetime.utcnow()
109 timeout = task.get("timeout_seconds", 300)
110
111 try:
112 await agent.reset()
113
114 output = await asyncio.wait_for(
115 agent.execute(task.get("input_data", {}), timeout),
116 timeout=timeout
117 )
118
119 score = evaluator(task.get("input_data", {}), output)
120
121 return TestResult(
122 test_id=task_id,
123 status=TestStatus.PASSED if score >= 0.7 else TestStatus.FAILED,
124 score=score,
125 start_time=start_time,
126 end_time=datetime.utcnow(),
127 output=output,
128 metrics=agent.get_metrics()
129 )
130
131 except asyncio.TimeoutError:
132 return TestResult(
133 test_id=task_id,
134 status=TestStatus.TIMEOUT,
135 score=0.0,
136 start_time=start_time,
137 end_time=datetime.utcnow(),
138 error=f"Timeout after {timeout}s"
139 )
140
141 except Exception as e:
142 return TestResult(
143 test_id=task_id,
144 status=TestStatus.ERROR,
145 score=0.0,
146 start_time=start_time,
147 end_time=datetime.utcnow(),
148 error=str(e)
149 )
150
151
152class DistributedRunner:
153 """Run benchmarks across multiple machines."""
154
155 def __init__(
156 self,
157 coordinator_url: str,
158 worker_id: str
159 ):
160 self.coordinator_url = coordinator_url
161 self.worker_id = worker_id
162
163 async def register(self):
164 """Register this worker with the coordinator."""
165 # In production, use actual HTTP/gRPC calls
166 pass
167
168 async def fetch_tasks(self) -> List[Dict[str, Any]]:
169 """Fetch assigned tasks from coordinator."""
170 pass
171
172 async def submit_results(self, results: List[TestResult]):
173 """Submit results back to coordinator."""
174 pass
175
176 async def run(self, agent: AgentAdapter, evaluator: Callable):
177 """Main worker loop."""
178
179 await self.register()
180
181 while True:
182 tasks = await self.fetch_tasks()
183
184 if not tasks:
185 await asyncio.sleep(5)
186 continue
187
188 runner = TestRunner(
189 agent=agent,
190 config=TestRunConfig(max_parallel=2)
191 )
192
193 results = await runner.run(tasks)
194 await self.submit_results(results)
195
196
197class LoadBalancer:
198 """Balances task distribution across workers."""
199
200 def __init__(self, workers: List[str]):
201 self.workers = workers
202 self.worker_loads: Dict[str, int] = {w: 0 for w in workers}
203 self.worker_performance: Dict[str, float] = {w: 1.0 for w in workers}
204
205 def assign_task(self, task: Dict[str, Any]) -> str:
206 """Assign a task to the best available worker."""
207
208 # Calculate weighted load (lower is better)
209 weighted_loads = {
210 w: self.worker_loads[w] / self.worker_performance[w]
211 for w in self.workers
212 }
213
214 # Select worker with lowest weighted load
215 best_worker = min(weighted_loads, key=weighted_loads.get)
216 self.worker_loads[best_worker] += 1
217
218 return best_worker
219
220 def complete_task(self, worker: str, duration: float):
221 """Record task completion."""
222 self.worker_loads[worker] = max(0, self.worker_loads[worker] - 1)
223
224 # Update performance estimate (exponential moving average)
225 alpha = 0.1
226 self.worker_performance[worker] = (
227 alpha * (1.0 / max(duration, 0.1)) +
228 (1 - alpha) * self.worker_performance[worker]
229 )Test Isolation
Test isolation ensures that tests don't affect each other. This is crucial for reliable benchmarking:
1"""
2Test isolation mechanisms.
3"""
4
5from abc import ABC, abstractmethod
6from contextlib import asynccontextmanager
7from dataclasses import dataclass
8from typing import Any, Dict, Optional
9import copy
10import os
11import tempfile
12
13
14class IsolationStrategy(ABC):
15 """Abstract base for isolation strategies."""
16
17 @abstractmethod
18 async def setup(self):
19 """Set up isolated environment."""
20 pass
21
22 @abstractmethod
23 async def teardown(self):
24 """Clean up isolated environment."""
25 pass
26
27 @abstractmethod
28 async def get_context(self) -> Dict[str, Any]:
29 """Get the isolated context."""
30 pass
31
32
33class StateIsolation(IsolationStrategy):
34 """Isolates agent state between tests."""
35
36 def __init__(self, agent: AgentAdapter):
37 self.agent = agent
38 self._original_state: Optional[Dict[str, Any]] = None
39
40 async def setup(self):
41 """Capture and reset agent state."""
42 # Capture original state
43 self._original_state = await self.agent.get_state()
44
45 # Reset to clean state
46 await self.agent.reset()
47
48 async def teardown(self):
49 """Restore original state."""
50 if self._original_state:
51 await self.agent.set_state(self._original_state)
52
53 async def get_context(self) -> Dict[str, Any]:
54 return {"state_isolated": True}
55
56
57class EnvironmentIsolation(IsolationStrategy):
58 """Isolates environment variables."""
59
60 def __init__(self, env_overrides: Dict[str, str] = None):
61 self.env_overrides = env_overrides or {}
62 self._original_env: Dict[str, Optional[str]] = {}
63
64 async def setup(self):
65 """Set isolated environment variables."""
66 for key, value in self.env_overrides.items():
67 self._original_env[key] = os.environ.get(key)
68 os.environ[key] = value
69
70 async def teardown(self):
71 """Restore original environment."""
72 for key, original in self._original_env.items():
73 if original is None:
74 os.environ.pop(key, None)
75 else:
76 os.environ[key] = original
77
78 async def get_context(self) -> Dict[str, Any]:
79 return {"env_isolated": True, "overrides": list(self.env_overrides.keys())}
80
81
82class FileSystemIsolation(IsolationStrategy):
83 """Isolates file system access."""
84
85 def __init__(self):
86 self._temp_dir: Optional[str] = None
87 self._original_cwd: Optional[str] = None
88
89 async def setup(self):
90 """Create temporary directory."""
91 self._temp_dir = tempfile.mkdtemp(prefix="agent_test_")
92 self._original_cwd = os.getcwd()
93 os.chdir(self._temp_dir)
94
95 async def teardown(self):
96 """Clean up temporary directory."""
97 if self._original_cwd:
98 os.chdir(self._original_cwd)
99
100 if self._temp_dir:
101 import shutil
102 shutil.rmtree(self._temp_dir, ignore_errors=True)
103
104 async def get_context(self) -> Dict[str, Any]:
105 return {"fs_isolated": True, "temp_dir": self._temp_dir}
106
107
108class NetworkIsolation(IsolationStrategy):
109 """Isolates network access with mocking."""
110
111 def __init__(self, mock_responses: Dict[str, Any] = None):
112 self.mock_responses = mock_responses or {}
113 self._original_fetch = None
114
115 async def setup(self):
116 """Set up network mocking."""
117 # In production, use libraries like responses or aioresponses
118 pass
119
120 async def teardown(self):
121 """Remove network mocking."""
122 pass
123
124 async def get_context(self) -> Dict[str, Any]:
125 return {"network_isolated": True, "mocked_urls": list(self.mock_responses.keys())}
126
127
128class CompositeIsolation(IsolationStrategy):
129 """Combines multiple isolation strategies."""
130
131 def __init__(self, strategies: List[IsolationStrategy]):
132 self.strategies = strategies
133
134 async def setup(self):
135 """Set up all isolation strategies."""
136 for strategy in self.strategies:
137 await strategy.setup()
138
139 async def teardown(self):
140 """Tear down all strategies in reverse order."""
141 for strategy in reversed(self.strategies):
142 await strategy.teardown()
143
144 async def get_context(self) -> Dict[str, Any]:
145 context = {}
146 for strategy in self.strategies:
147 context.update(await strategy.get_context())
148 return context
149
150
151@asynccontextmanager
152async def isolated_test(
153 agent: AgentAdapter,
154 isolation_strategies: List[IsolationStrategy] = None
155):
156 """Context manager for running isolated tests."""
157
158 if isolation_strategies is None:
159 isolation_strategies = [StateIsolation(agent)]
160
161 composite = CompositeIsolation(isolation_strategies)
162
163 try:
164 await composite.setup()
165 context = await composite.get_context()
166 yield context
167 finally:
168 await composite.teardown()
169
170
171class TestSandbox:
172 """Sandboxed environment for test execution."""
173
174 def __init__(
175 self,
176 agent: AgentAdapter,
177 isolations: List[IsolationStrategy] = None
178 ):
179 self.agent = agent
180 self.isolations = isolations or []
181 self._active = False
182
183 async def __aenter__(self):
184 """Enter the sandbox."""
185 for isolation in self.isolations:
186 await isolation.setup()
187 self._active = True
188 return self
189
190 async def __aexit__(self, exc_type, exc_val, exc_tb):
191 """Exit the sandbox."""
192 for isolation in reversed(self.isolations):
193 try:
194 await isolation.teardown()
195 except Exception:
196 pass # Don't fail on cleanup errors
197 self._active = False
198
199 async def run_test(
200 self,
201 task: Dict[str, Any],
202 evaluator: Callable
203 ) -> TestResult:
204 """Run a test within the sandbox."""
205
206 if not self._active:
207 raise RuntimeError("Sandbox not active")
208
209 start_time = datetime.utcnow()
210 task_id = task.get("id", "unknown")
211
212 try:
213 await self.agent.reset()
214
215 output = await self.agent.execute(
216 task.get("input_data", {}),
217 task.get("timeout_seconds", 300)
218 )
219
220 score = evaluator(task.get("input_data", {}), output)
221
222 return TestResult(
223 test_id=task_id,
224 status=TestStatus.PASSED if score >= 0.7 else TestStatus.FAILED,
225 score=score,
226 start_time=start_time,
227 end_time=datetime.utcnow(),
228 output=output
229 )
230
231 except Exception as e:
232 return TestResult(
233 test_id=task_id,
234 status=TestStatus.ERROR,
235 score=0.0,
236 start_time=start_time,
237 end_time=datetime.utcnow(),
238 error=str(e)
239 )Result Collection
Comprehensive result collection enables deep analysis and debugging. Here's how to build a robust result collection system:
1"""
2Result collection and reporting.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime
7from pathlib import Path
8from typing import Any, Dict, List, Optional
9import json
10import statistics
11
12
13@dataclass
14class TestSuiteResult:
15 """Complete result of a test suite run."""
16 suite_name: str
17 agent_id: str
18 run_id: str
19 start_time: datetime
20 end_time: datetime
21 results: List[TestResult]
22 config: Dict[str, Any] = field(default_factory=dict)
23 metadata: Dict[str, Any] = field(default_factory=dict)
24
25 @property
26 def duration_seconds(self) -> float:
27 return (self.end_time - self.start_time).total_seconds()
28
29 @property
30 def pass_count(self) -> int:
31 return sum(1 for r in self.results if r.status == TestStatus.PASSED)
32
33 @property
34 def fail_count(self) -> int:
35 return sum(1 for r in self.results if r.status == TestStatus.FAILED)
36
37 @property
38 def error_count(self) -> int:
39 return sum(1 for r in self.results if r.status == TestStatus.ERROR)
40
41 @property
42 def timeout_count(self) -> int:
43 return sum(1 for r in self.results if r.status == TestStatus.TIMEOUT)
44
45 @property
46 def pass_rate(self) -> float:
47 total = len(self.results)
48 return self.pass_count / total if total > 0 else 0.0
49
50 @property
51 def average_score(self) -> float:
52 scores = [r.score for r in self.results]
53 return statistics.mean(scores) if scores else 0.0
54
55 @property
56 def average_duration(self) -> float:
57 durations = [r.duration_seconds for r in self.results]
58 return statistics.mean(durations) if durations else 0.0
59
60 def get_summary(self) -> Dict[str, Any]:
61 """Get summary statistics."""
62 return {
63 "suite_name": self.suite_name,
64 "agent_id": self.agent_id,
65 "run_id": self.run_id,
66 "total_tests": len(self.results),
67 "passed": self.pass_count,
68 "failed": self.fail_count,
69 "errors": self.error_count,
70 "timeouts": self.timeout_count,
71 "pass_rate": round(self.pass_rate, 4),
72 "average_score": round(self.average_score, 4),
73 "average_duration": round(self.average_duration, 2),
74 "total_duration": round(self.duration_seconds, 2)
75 }
76
77 def get_failed_tests(self) -> List[TestResult]:
78 """Get all failed tests."""
79 return [
80 r for r in self.results
81 if r.status in (TestStatus.FAILED, TestStatus.ERROR, TestStatus.TIMEOUT)
82 ]
83
84 def to_dict(self) -> Dict[str, Any]:
85 return {
86 "summary": self.get_summary(),
87 "results": [r.to_dict() for r in self.results],
88 "config": self.config,
89 "metadata": self.metadata
90 }
91
92
93class ResultCollector:
94 """Collects and aggregates test results."""
95
96 def __init__(self, output_dir: Path = None):
97 self.output_dir = output_dir or Path("test_results")
98 self.results: List[TestResult] = []
99 self.start_time: Optional[datetime] = None
100 self._lock = asyncio.Lock()
101
102 async def start_run(self, suite_name: str, agent_id: str) -> str:
103 """Start a new test run."""
104 self.start_time = datetime.utcnow()
105 self.results = []
106
107 run_id = f"{suite_name}_{agent_id}_{self.start_time.strftime('%Y%m%d_%H%M%S')}"
108
109 # Create output directory
110 run_dir = self.output_dir / run_id
111 run_dir.mkdir(parents=True, exist_ok=True)
112
113 return run_id
114
115 async def record_result(self, result: TestResult):
116 """Record a single test result."""
117 async with self._lock:
118 self.results.append(result)
119
120 # Write incremental result
121 if self.output_dir:
122 result_file = self.output_dir / "current" / f"{result.test_id}.json"
123 result_file.parent.mkdir(parents=True, exist_ok=True)
124 with open(result_file, "w") as f:
125 json.dump(result.to_dict(), f, indent=2)
126
127 async def finish_run(
128 self,
129 run_id: str,
130 suite_name: str,
131 agent_id: str,
132 config: Dict[str, Any] = None,
133 metadata: Dict[str, Any] = None
134 ) -> TestSuiteResult:
135 """Finish the test run and generate final report."""
136
137 suite_result = TestSuiteResult(
138 suite_name=suite_name,
139 agent_id=agent_id,
140 run_id=run_id,
141 start_time=self.start_time or datetime.utcnow(),
142 end_time=datetime.utcnow(),
143 results=self.results.copy(),
144 config=config or {},
145 metadata=metadata or {}
146 )
147
148 # Save full results
149 if self.output_dir:
150 run_dir = self.output_dir / run_id
151 run_dir.mkdir(parents=True, exist_ok=True)
152
153 with open(run_dir / "results.json", "w") as f:
154 json.dump(suite_result.to_dict(), f, indent=2)
155
156 # Save summary
157 with open(run_dir / "summary.json", "w") as f:
158 json.dump(suite_result.get_summary(), f, indent=2)
159
160 return suite_result
161
162
163class ResultReporter:
164 """Generates reports from test results."""
165
166 def __init__(self, suite_result: TestSuiteResult):
167 self.result = suite_result
168
169 def generate_markdown(self) -> str:
170 """Generate markdown report."""
171
172 summary = self.result.get_summary()
173
174 md = f"""# Test Results: {self.result.suite_name}
175
176## Summary
177
178| Metric | Value |
179|--------|-------|
180| Agent | {summary['agent_id']} |
181| Total Tests | {summary['total_tests']} |
182| Passed | {summary['passed']} |
183| Failed | {summary['failed']} |
184| Errors | {summary['errors']} |
185| Pass Rate | {summary['pass_rate']*100:.1f}% |
186| Average Score | {summary['average_score']:.3f} |
187| Total Duration | {summary['total_duration']:.1f}s |
188
189## Failed Tests
190
191"""
192 failed = self.result.get_failed_tests()
193
194 if failed:
195 for test in failed:
196 md += f"### {test.test_id}\n"
197 md += f"- Status: {test.status.value}\n"
198 md += f"- Score: {test.score:.3f}\n"
199 if test.error:
200 md += f"- Error: {test.error[:200]}\n"
201 md += "\n"
202 else:
203 md += "No failed tests!\n"
204
205 return md
206
207 def generate_junit_xml(self) -> str:
208 """Generate JUnit XML for CI integration."""
209
210 xml_parts = [
211 '<?xml version="1.0" encoding="UTF-8"?>',
212 f'<testsuite name="{self.result.suite_name}" '
213 f'tests="{len(self.result.results)}" '
214 f'failures="{self.result.fail_count}" '
215 f'errors="{self.result.error_count}" '
216 f'time="{self.result.duration_seconds:.3f}">'
217 ]
218
219 for test in self.result.results:
220 xml_parts.append(
221 f' <testcase name="{test.test_id}" '
222 f'time="{test.duration_seconds:.3f}">'
223 )
224
225 if test.status == TestStatus.FAILED:
226 xml_parts.append(
227 f' <failure message="Score: {test.score:.3f}"/>'
228 )
229 elif test.status == TestStatus.ERROR:
230 xml_parts.append(
231 f' <error message="{test.error or "Unknown error"}"/>'
232 )
233 elif test.status == TestStatus.TIMEOUT:
234 xml_parts.append(
235 ' <error message="Test timed out"/>'
236 )
237
238 xml_parts.append(' </testcase>')
239
240 xml_parts.append('</testsuite>')
241
242 return '\n'.join(xml_parts)Testing Patterns
Common testing patterns help structure your benchmarks effectively:
1"""
2Common testing patterns for agent benchmarks.
3"""
4
5from typing import Any, Callable, Dict, List
6
7
8# Pattern 1: Parameterized Testing
9class ParameterizedTest:
10 """Run the same test with different parameters."""
11
12 def __init__(
13 self,
14 base_task: Dict[str, Any],
15 parameter_sets: List[Dict[str, Any]]
16 ):
17 self.base_task = base_task
18 self.parameter_sets = parameter_sets
19
20 def generate_tasks(self) -> List[Dict[str, Any]]:
21 """Generate tasks for all parameter combinations."""
22 tasks = []
23
24 for i, params in enumerate(self.parameter_sets):
25 task = {**self.base_task}
26 task["id"] = f"{self.base_task.get('id', 'test')}_{i}"
27 task["input_data"] = {
28 **self.base_task.get("input_data", {}),
29 **params
30 }
31 tasks.append(task)
32
33 return tasks
34
35
36# Pattern 2: Golden Test
37class GoldenTest:
38 """Compare output against known-good reference."""
39
40 def __init__(
41 self,
42 task: Dict[str, Any],
43 golden_output: Dict[str, Any],
44 comparison_fn: Callable = None
45 ):
46 self.task = task
47 self.golden_output = golden_output
48 self.comparison_fn = comparison_fn or self._default_compare
49
50 def evaluate(self, output: Dict[str, Any]) -> float:
51 """Compare output to golden reference."""
52 return self.comparison_fn(output, self.golden_output)
53
54 def _default_compare(self, output: Dict, golden: Dict) -> float:
55 """Default comparison function."""
56 matches = 0
57 total = 0
58
59 for key in golden:
60 total += 1
61 if key in output and output[key] == golden[key]:
62 matches += 1
63
64 return matches / total if total > 0 else 0.0
65
66
67# Pattern 3: Regression Test
68class RegressionTest:
69 """Ensure previously working functionality still works."""
70
71 def __init__(self, known_issues: List[Dict[str, Any]]):
72 self.known_issues = known_issues
73
74 def generate_tasks(self) -> List[Dict[str, Any]]:
75 """Generate regression test tasks."""
76 return [
77 {
78 "id": f"regression_{issue['id']}",
79 "name": f"Regression: {issue['title']}",
80 "category": "regression",
81 "difficulty": "medium",
82 "input_data": issue["input"],
83 "expected_output": issue["expected_output"],
84 "metadata": {
85 "original_issue": issue["id"],
86 "fixed_date": issue.get("fixed_date")
87 }
88 }
89 for issue in self.known_issues
90 ]
91
92
93# Pattern 4: Fuzzing Test
94class FuzzingTest:
95 """Test with randomly generated inputs."""
96
97 def __init__(
98 self,
99 input_generator: Callable[[], Dict[str, Any]],
100 validator: Callable[[Dict[str, Any]], bool],
101 count: int = 100
102 ):
103 self.input_generator = input_generator
104 self.validator = validator
105 self.count = count
106
107 def generate_tasks(self, seed: int = 42) -> List[Dict[str, Any]]:
108 """Generate fuzz test tasks."""
109 import random
110 random.seed(seed)
111
112 return [
113 {
114 "id": f"fuzz_{i}",
115 "name": f"Fuzz test {i}",
116 "category": "edge_case",
117 "difficulty": "hard",
118 "input_data": self.input_generator(),
119 "evaluation_criteria": {"validator": "fuzz_validator"}
120 }
121 for i in range(self.count)
122 ]
123
124
125# Pattern 5: Property-Based Test
126class PropertyTest:
127 """Test that outputs satisfy certain properties."""
128
129 def __init__(
130 self,
131 properties: List[Callable[[Dict[str, Any], Dict[str, Any]], bool]],
132 property_names: List[str] = None
133 ):
134 self.properties = properties
135 self.property_names = property_names or [
136 f"property_{i}" for i in range(len(properties))
137 ]
138
139 def evaluate(
140 self,
141 input_data: Dict[str, Any],
142 output: Dict[str, Any]
143 ) -> Dict[str, bool]:
144 """Check all properties."""
145 return {
146 name: prop(input_data, output)
147 for name, prop in zip(self.property_names, self.properties)
148 }
149
150 def get_score(
151 self,
152 input_data: Dict[str, Any],
153 output: Dict[str, Any]
154 ) -> float:
155 """Get score based on properties satisfied."""
156 results = self.evaluate(input_data, output)
157 return sum(results.values()) / len(results)
158
159
160# Pattern 6: Snapshot Test
161class SnapshotTest:
162 """Compare output against stored snapshots."""
163
164 def __init__(self, snapshot_dir: Path):
165 self.snapshot_dir = snapshot_dir
166
167 def update_snapshot(self, test_id: str, output: Dict[str, Any]):
168 """Update the snapshot for a test."""
169 snapshot_file = self.snapshot_dir / f"{test_id}.json"
170 snapshot_file.parent.mkdir(parents=True, exist_ok=True)
171
172 with open(snapshot_file, "w") as f:
173 json.dump(output, f, indent=2, sort_keys=True)
174
175 def compare_snapshot(
176 self,
177 test_id: str,
178 output: Dict[str, Any]
179 ) -> Tuple[bool, Optional[Dict[str, Any]]]:
180 """Compare output to snapshot."""
181 snapshot_file = self.snapshot_dir / f"{test_id}.json"
182
183 if not snapshot_file.exists():
184 return False, {"error": "No snapshot exists"}
185
186 with open(snapshot_file) as f:
187 snapshot = json.load(f)
188
189 if output == snapshot:
190 return True, None
191
192 # Calculate diff
193 diff = self._diff(snapshot, output)
194 return False, diff
195
196 def _diff(
197 self,
198 expected: Dict[str, Any],
199 actual: Dict[str, Any]
200 ) -> Dict[str, Any]:
201 """Calculate difference between expected and actual."""
202 diff = {
203 "added": {},
204 "removed": {},
205 "changed": {}
206 }
207
208 all_keys = set(expected.keys()) | set(actual.keys())
209
210 for key in all_keys:
211 if key not in expected:
212 diff["added"][key] = actual[key]
213 elif key not in actual:
214 diff["removed"][key] = expected[key]
215 elif expected[key] != actual[key]:
216 diff["changed"][key] = {
217 "expected": expected[key],
218 "actual": actual[key]
219 }
220
221 return diffSummary
This section covered the essential components of a testing framework for AI agents:
| Component | Purpose | Key Features |
|---|---|---|
| Test Runner | Execute benchmarks | Timeout handling, retries, filtering |
| Parallel Execution | Scale testing | Worker pools, load balancing |
| Test Isolation | Ensure independence | State, environment, filesystem isolation |
| Result Collection | Gather data | Incremental recording, reports |
| Testing Patterns | Structure tests | Parameterized, golden, fuzzing |
Key Takeaways: A robust testing framework handles parallel execution, ensures test isolation, collects comprehensive results, and supports common testing patterns. Invest in your testing infrastructure early—it pays dividends as your benchmark suite grows.
In the next section, we'll explore A/B testing and experimentation for comparing agent versions and configurations.