Chapter 20
15 min read
Section 127 of 175

Continuous Evaluation

Evaluation and Benchmarking

Introduction

Continuous evaluation monitors agent performance in production, detecting issues before they impact users. Unlike periodic benchmarks, continuous evaluation provides real-time insights into agent behavior and automatically alerts on regressions.

The Monitoring Imperative: Agents can degrade silently due to model updates, data drift, or upstream changes. Continuous evaluation catches these issues early, often before users notice.

This section covers how to implement production monitoring, set up automated triggers, detect drift, configure alerts, and schedule regular evaluations.


Production Monitoring

Production monitoring captures real-time metrics from live agent interactions. Here's a comprehensive monitoring system:

🐍python
1"""
2Production monitoring for AI agents.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime, timedelta
7from typing import Any, Callable, Dict, List, Optional
8from collections import defaultdict
9import asyncio
10import time
11
12
13@dataclass
14class MetricWindow:
15    """Time-windowed metric aggregation."""
16    name: str
17    window_size_seconds: int
18    values: List[float] = field(default_factory=list)
19    timestamps: List[datetime] = field(default_factory=list)
20
21    def add(self, value: float, timestamp: datetime = None):
22        """Add a value to the window."""
23        ts = timestamp or datetime.utcnow()
24        self.values.append(value)
25        self.timestamps.append(ts)
26        self._cleanup()
27
28    def _cleanup(self):
29        """Remove values outside the window."""
30        cutoff = datetime.utcnow() - timedelta(seconds=self.window_size_seconds)
31
32        # Find first index within window
33        start_idx = 0
34        for i, ts in enumerate(self.timestamps):
35            if ts >= cutoff:
36                start_idx = i
37                break
38
39        if start_idx > 0:
40            self.values = self.values[start_idx:]
41            self.timestamps = self.timestamps[start_idx:]
42
43    @property
44    def count(self) -> int:
45        return len(self.values)
46
47    @property
48    def mean(self) -> float:
49        return sum(self.values) / len(self.values) if self.values else 0
50
51    @property
52    def min(self) -> float:
53        return min(self.values) if self.values else 0
54
55    @property
56    def max(self) -> float:
57        return max(self.values) if self.values else 0
58
59    @property
60    def sum(self) -> float:
61        return sum(self.values)
62
63    def percentile(self, p: float) -> float:
64        """Calculate percentile."""
65        if not self.values:
66            return 0
67        sorted_values = sorted(self.values)
68        idx = int(len(sorted_values) * p / 100)
69        return sorted_values[min(idx, len(sorted_values) - 1)]
70
71
72class ProductionMonitor:
73    """Monitors agent performance in production."""
74
75    def __init__(self, window_sizes: List[int] = None):
76        self.window_sizes = window_sizes or [60, 300, 3600]  # 1m, 5m, 1h
77        self.metrics: Dict[str, Dict[int, MetricWindow]] = defaultdict(dict)
78        self.counters: Dict[str, int] = defaultdict(int)
79        self.listeners: List[Callable] = []
80
81    def record(self, metric_name: str, value: float):
82        """Record a metric value."""
83
84        timestamp = datetime.utcnow()
85
86        # Add to all window sizes
87        for window_size in self.window_sizes:
88            if window_size not in self.metrics[metric_name]:
89                self.metrics[metric_name][window_size] = MetricWindow(
90                    name=metric_name,
91                    window_size_seconds=window_size
92                )
93            self.metrics[metric_name][window_size].add(value, timestamp)
94
95        # Notify listeners
96        for listener in self.listeners:
97            listener(metric_name, value, timestamp)
98
99    def increment(self, counter_name: str, amount: int = 1):
100        """Increment a counter."""
101        self.counters[counter_name] += amount
102
103    def get_stats(
104        self,
105        metric_name: str,
106        window_seconds: int = 300
107    ) -> Dict[str, float]:
108        """Get statistics for a metric."""
109
110        window = self.metrics.get(metric_name, {}).get(window_seconds)
111
112        if not window or window.count == 0:
113            return {}
114
115        return {
116            "count": window.count,
117            "mean": window.mean,
118            "min": window.min,
119            "max": window.max,
120            "p50": window.percentile(50),
121            "p90": window.percentile(90),
122            "p99": window.percentile(99),
123            "sum": window.sum
124        }
125
126    def get_all_stats(self, window_seconds: int = 300) -> Dict[str, Dict[str, float]]:
127        """Get statistics for all metrics."""
128        return {
129            name: self.get_stats(name, window_seconds)
130            for name in self.metrics
131        }
132
133    def add_listener(self, callback: Callable):
134        """Add a metric listener."""
135        self.listeners.append(callback)
136
137
138class AgentMetricsCollector:
139    """Collects specific agent metrics."""
140
141    def __init__(self, monitor: ProductionMonitor):
142        self.monitor = monitor
143
144    def record_request(
145        self,
146        duration_ms: float,
147        success: bool,
148        token_count: int,
149        cost: float
150    ):
151        """Record a completed agent request."""
152
153        self.monitor.record("request.duration_ms", duration_ms)
154        self.monitor.record("request.tokens", token_count)
155        self.monitor.record("request.cost", cost)
156
157        if success:
158            self.monitor.increment("request.success")
159            self.monitor.record("request.success_rate", 1.0)
160        else:
161            self.monitor.increment("request.failure")
162            self.monitor.record("request.success_rate", 0.0)
163
164        self.monitor.increment("request.total")
165
166    def record_tool_use(
167        self,
168        tool_name: str,
169        duration_ms: float,
170        success: bool
171    ):
172        """Record a tool invocation."""
173
174        self.monitor.record(f"tool.{tool_name}.duration_ms", duration_ms)
175        self.monitor.record(f"tool.{tool_name}.success", 1.0 if success else 0.0)
176        self.monitor.increment(f"tool.{tool_name}.count")
177
178    def record_quality_score(self, score: float, evaluator: str):
179        """Record a quality evaluation score."""
180        self.monitor.record(f"quality.{evaluator}", score)
181
182    def record_user_feedback(self, rating: float, feedback_type: str):
183        """Record user feedback."""
184        self.monitor.record(f"feedback.{feedback_type}", rating)
185
186
187class HealthChecker:
188    """Checks agent health based on metrics."""
189
190    def __init__(
191        self,
192        monitor: ProductionMonitor,
193        thresholds: Dict[str, Dict[str, float]] = None
194    ):
195        self.monitor = monitor
196        self.thresholds = thresholds or {
197            "request.success_rate": {"min": 0.95},
198            "request.duration_ms": {"max": 5000, "p99_max": 10000},
199            "quality.accuracy": {"min": 0.8}
200        }
201
202    def check(self) -> Dict[str, Any]:
203        """Perform health check."""
204
205        results = {
206            "status": "healthy",
207            "checks": {},
208            "timestamp": datetime.utcnow().isoformat()
209        }
210
211        issues = []
212
213        for metric_name, thresholds in self.thresholds.items():
214            stats = self.monitor.get_stats(metric_name)
215
216            if not stats:
217                continue
218
219            check_result = {"status": "ok", "stats": stats, "issues": []}
220
221            # Check min threshold
222            if "min" in thresholds and stats.get("mean", 0) < thresholds["min"]:
223                check_result["status"] = "failing"
224                check_result["issues"].append(
225                    f"mean {stats['mean']:.3f} < min {thresholds['min']}"
226                )
227                issues.append(metric_name)
228
229            # Check max threshold
230            if "max" in thresholds and stats.get("mean", 0) > thresholds["max"]:
231                check_result["status"] = "failing"
232                check_result["issues"].append(
233                    f"mean {stats['mean']:.3f} > max {thresholds['max']}"
234                )
235                issues.append(metric_name)
236
237            # Check p99 threshold
238            if "p99_max" in thresholds and stats.get("p99", 0) > thresholds["p99_max"]:
239                check_result["status"] = "warning"
240                check_result["issues"].append(
241                    f"p99 {stats['p99']:.3f} > max {thresholds['p99_max']}"
242                )
243
244            results["checks"][metric_name] = check_result
245
246        # Set overall status
247        if issues:
248            results["status"] = "unhealthy"
249            results["failing_metrics"] = issues
250
251        return results

Automated Evaluation Triggers

Automated triggers initiate evaluations based on events or conditions. This enables proactive quality assurance:

🐍python
1"""
2Automated evaluation triggers.
3"""
4
5from abc import ABC, abstractmethod
6from dataclasses import dataclass
7from datetime import datetime, timedelta
8from typing import Any, Callable, Dict, List, Optional
9from enum import Enum
10import asyncio
11
12
13class TriggerType(Enum):
14    """Types of evaluation triggers."""
15    DEPLOYMENT = "deployment"
16    THRESHOLD = "threshold"
17    SCHEDULE = "schedule"
18    MANUAL = "manual"
19    DRIFT = "drift"
20    ANOMALY = "anomaly"
21
22
23@dataclass
24class TriggerEvent:
25    """Event that triggered an evaluation."""
26    trigger_type: TriggerType
27    trigger_id: str
28    timestamp: datetime
29    metadata: Dict[str, Any]
30    priority: int = 0
31
32
33class EvaluationTrigger(ABC):
34    """Abstract base for evaluation triggers."""
35
36    def __init__(self, trigger_id: str, priority: int = 0):
37        self.trigger_id = trigger_id
38        self.priority = priority
39        self.enabled = True
40        self.last_triggered: Optional[datetime] = None
41        self.cooldown_seconds: int = 300  # 5 minutes
42
43    @abstractmethod
44    async def check(self) -> Optional[TriggerEvent]:
45        """Check if trigger condition is met."""
46        pass
47
48    def can_trigger(self) -> bool:
49        """Check if trigger can fire (cooldown check)."""
50        if not self.enabled:
51            return False
52
53        if self.last_triggered:
54            elapsed = (datetime.utcnow() - self.last_triggered).total_seconds()
55            if elapsed < self.cooldown_seconds:
56                return False
57
58        return True
59
60    def mark_triggered(self):
61        """Mark trigger as having fired."""
62        self.last_triggered = datetime.utcnow()
63
64
65class DeploymentTrigger(EvaluationTrigger):
66    """Triggers evaluation on new deployments."""
67
68    def __init__(
69        self,
70        trigger_id: str,
71        deployment_detector: Callable[[], Optional[str]]
72    ):
73        super().__init__(trigger_id, priority=10)
74        self.deployment_detector = deployment_detector
75        self.last_deployment: Optional[str] = None
76
77    async def check(self) -> Optional[TriggerEvent]:
78        if not self.can_trigger():
79            return None
80
81        current_deployment = self.deployment_detector()
82
83        if current_deployment and current_deployment != self.last_deployment:
84            self.last_deployment = current_deployment
85            self.mark_triggered()
86
87            return TriggerEvent(
88                trigger_type=TriggerType.DEPLOYMENT,
89                trigger_id=self.trigger_id,
90                timestamp=datetime.utcnow(),
91                metadata={"deployment_id": current_deployment},
92                priority=self.priority
93            )
94
95        return None
96
97
98class ThresholdTrigger(EvaluationTrigger):
99    """Triggers evaluation when metrics cross thresholds."""
100
101    def __init__(
102        self,
103        trigger_id: str,
104        monitor: ProductionMonitor,
105        metric_name: str,
106        threshold: float,
107        comparison: str = "lt",  # "lt", "gt", "le", "ge"
108        window_seconds: int = 300
109    ):
110        super().__init__(trigger_id, priority=5)
111        self.monitor = monitor
112        self.metric_name = metric_name
113        self.threshold = threshold
114        self.comparison = comparison
115        self.window_seconds = window_seconds
116
117    async def check(self) -> Optional[TriggerEvent]:
118        if not self.can_trigger():
119            return None
120
121        stats = self.monitor.get_stats(self.metric_name, self.window_seconds)
122
123        if not stats:
124            return None
125
126        value = stats.get("mean", 0)
127
128        triggered = False
129        if self.comparison == "lt" and value < self.threshold:
130            triggered = True
131        elif self.comparison == "gt" and value > self.threshold:
132            triggered = True
133        elif self.comparison == "le" and value <= self.threshold:
134            triggered = True
135        elif self.comparison == "ge" and value >= self.threshold:
136            triggered = True
137
138        if triggered:
139            self.mark_triggered()
140            return TriggerEvent(
141                trigger_type=TriggerType.THRESHOLD,
142                trigger_id=self.trigger_id,
143                timestamp=datetime.utcnow(),
144                metadata={
145                    "metric": self.metric_name,
146                    "value": value,
147                    "threshold": self.threshold,
148                    "comparison": self.comparison
149                },
150                priority=self.priority
151            )
152
153        return None
154
155
156class AnomalyTrigger(EvaluationTrigger):
157    """Triggers evaluation on metric anomalies."""
158
159    def __init__(
160        self,
161        trigger_id: str,
162        monitor: ProductionMonitor,
163        metric_name: str,
164        std_dev_threshold: float = 3.0,
165        baseline_window_seconds: int = 3600
166    ):
167        super().__init__(trigger_id, priority=8)
168        self.monitor = monitor
169        self.metric_name = metric_name
170        self.std_dev_threshold = std_dev_threshold
171        self.baseline_window = baseline_window_seconds
172        self.baseline_mean: Optional[float] = None
173        self.baseline_std: Optional[float] = None
174
175    def update_baseline(self):
176        """Update baseline statistics."""
177        window = self.monitor.metrics.get(
178            self.metric_name, {}
179        ).get(self.baseline_window)
180
181        if window and window.count > 10:
182            import statistics
183            self.baseline_mean = window.mean
184            if len(window.values) > 1:
185                self.baseline_std = statistics.stdev(window.values)
186
187    async def check(self) -> Optional[TriggerEvent]:
188        if not self.can_trigger():
189            return None
190
191        # Get recent value
192        stats = self.monitor.get_stats(self.metric_name, 60)  # Last minute
193
194        if not stats:
195            return None
196
197        # Update baseline periodically
198        self.update_baseline()
199
200        if self.baseline_mean is None or self.baseline_std is None:
201            return None
202
203        if self.baseline_std == 0:
204            return None
205
206        current = stats.get("mean", 0)
207        z_score = abs(current - self.baseline_mean) / self.baseline_std
208
209        if z_score > self.std_dev_threshold:
210            self.mark_triggered()
211            return TriggerEvent(
212                trigger_type=TriggerType.ANOMALY,
213                trigger_id=self.trigger_id,
214                timestamp=datetime.utcnow(),
215                metadata={
216                    "metric": self.metric_name,
217                    "current_value": current,
218                    "baseline_mean": self.baseline_mean,
219                    "z_score": z_score
220                },
221                priority=self.priority
222            )
223
224        return None
225
226
227class TriggerManager:
228    """Manages all evaluation triggers."""
229
230    def __init__(self):
231        self.triggers: List[EvaluationTrigger] = []
232        self.listeners: List[Callable[[TriggerEvent], None]] = []
233        self._running = False
234
235    def register(self, trigger: EvaluationTrigger):
236        """Register a trigger."""
237        self.triggers.append(trigger)
238
239    def add_listener(self, callback: Callable[[TriggerEvent], None]):
240        """Add event listener."""
241        self.listeners.append(callback)
242
243    async def start(self, check_interval: float = 10.0):
244        """Start trigger monitoring loop."""
245        self._running = True
246
247        while self._running:
248            events = await self._check_all()
249
250            # Sort by priority and dispatch
251            for event in sorted(events, key=lambda e: -e.priority):
252                for listener in self.listeners:
253                    try:
254                        listener(event)
255                    except Exception:
256                        pass
257
258            await asyncio.sleep(check_interval)
259
260    def stop(self):
261        """Stop trigger monitoring."""
262        self._running = False
263
264    async def _check_all(self) -> List[TriggerEvent]:
265        """Check all triggers."""
266        events = []
267
268        for trigger in self.triggers:
269            try:
270                event = await trigger.check()
271                if event:
272                    events.append(event)
273            except Exception:
274                pass
275
276        return events

Drift Detection

Drift detection identifies when agent behavior changes over time, often indicating a problem that requires investigation:

🐍python
1"""
2Drift detection for agent evaluation.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime, timedelta
7from typing import Any, Dict, List, Optional, Tuple
8import statistics
9
10
11@dataclass
12class DriftResult:
13    """Result of drift detection."""
14    metric_name: str
15    drift_detected: bool
16    drift_score: float
17    baseline_mean: float
18    current_mean: float
19    relative_change: float
20    p_value: Optional[float] = None
21    details: Dict[str, Any] = field(default_factory=dict)
22
23
24class DriftDetector:
25    """Detects drift in agent metrics."""
26
27    def __init__(
28        self,
29        monitor: ProductionMonitor,
30        baseline_window_hours: int = 24,
31        detection_window_hours: int = 1,
32        sensitivity: float = 0.1  # 10% change threshold
33    ):
34        self.monitor = monitor
35        self.baseline_window = baseline_window_hours * 3600
36        self.detection_window = detection_window_hours * 3600
37        self.sensitivity = sensitivity
38        self.baselines: Dict[str, Dict[str, float]] = {}
39
40    def update_baselines(self):
41        """Update baseline statistics for all metrics."""
42
43        for metric_name in self.monitor.metrics:
44            window = self.monitor.metrics[metric_name].get(self.baseline_window)
45
46            if window and window.count > 100:
47                self.baselines[metric_name] = {
48                    "mean": window.mean,
49                    "std": statistics.stdev(window.values) if len(window.values) > 1 else 0,
50                    "min": window.min,
51                    "max": window.max,
52                    "count": window.count,
53                    "updated_at": datetime.utcnow().isoformat()
54                }
55
56    def detect(self, metric_name: str) -> DriftResult:
57        """Detect drift for a specific metric."""
58
59        baseline = self.baselines.get(metric_name)
60        current_stats = self.monitor.get_stats(metric_name, self.detection_window)
61
62        if not baseline or not current_stats:
63            return DriftResult(
64                metric_name=metric_name,
65                drift_detected=False,
66                drift_score=0.0,
67                baseline_mean=0,
68                current_mean=0,
69                relative_change=0,
70                details={"error": "Insufficient data"}
71            )
72
73        baseline_mean = baseline["mean"]
74        current_mean = current_stats["mean"]
75
76        # Calculate relative change
77        if baseline_mean != 0:
78            relative_change = (current_mean - baseline_mean) / abs(baseline_mean)
79        else:
80            relative_change = current_mean
81
82        # Calculate drift score (z-score like)
83        baseline_std = baseline.get("std", 0)
84        if baseline_std > 0:
85            drift_score = abs(current_mean - baseline_mean) / baseline_std
86        else:
87            drift_score = abs(relative_change)
88
89        # Detect drift
90        drift_detected = (
91            abs(relative_change) > self.sensitivity or
92            drift_score > 3.0
93        )
94
95        return DriftResult(
96            metric_name=metric_name,
97            drift_detected=drift_detected,
98            drift_score=drift_score,
99            baseline_mean=baseline_mean,
100            current_mean=current_mean,
101            relative_change=relative_change,
102            details={
103                "baseline_std": baseline_std,
104                "sensitivity": self.sensitivity,
105                "baseline_count": baseline["count"]
106            }
107        )
108
109    def detect_all(self) -> Dict[str, DriftResult]:
110        """Detect drift for all monitored metrics."""
111        self.update_baselines()
112
113        return {
114            metric_name: self.detect(metric_name)
115            for metric_name in self.baselines
116        }
117
118    def get_drifting_metrics(self) -> List[DriftResult]:
119        """Get all metrics showing drift."""
120        all_results = self.detect_all()
121        return [r for r in all_results.values() if r.drift_detected]
122
123
124class DataDriftDetector:
125    """Detects drift in input data distributions."""
126
127    def __init__(self, reference_window: int = 1000):
128        self.reference_window = reference_window
129        self.reference_data: Dict[str, List[Any]] = {}
130        self.current_data: Dict[str, List[Any]] = {}
131
132    def add_reference(self, feature_name: str, value: Any):
133        """Add to reference distribution."""
134        if feature_name not in self.reference_data:
135            self.reference_data[feature_name] = []
136
137        self.reference_data[feature_name].append(value)
138
139        # Keep window size
140        if len(self.reference_data[feature_name]) > self.reference_window:
141            self.reference_data[feature_name] = (
142                self.reference_data[feature_name][-self.reference_window:]
143            )
144
145    def add_current(self, feature_name: str, value: Any):
146        """Add to current distribution."""
147        if feature_name not in self.current_data:
148            self.current_data[feature_name] = []
149
150        self.current_data[feature_name].append(value)
151
152        # Keep smaller window for current
153        window = self.reference_window // 10
154        if len(self.current_data[feature_name]) > window:
155            self.current_data[feature_name] = (
156                self.current_data[feature_name][-window:]
157            )
158
159    def detect_drift(
160        self,
161        feature_name: str
162    ) -> Tuple[bool, float]:
163        """Detect drift for a feature."""
164
165        ref = self.reference_data.get(feature_name, [])
166        curr = self.current_data.get(feature_name, [])
167
168        if len(ref) < 100 or len(curr) < 10:
169            return False, 0.0
170
171        # For numeric data, use KS-test approximation
172        if isinstance(ref[0], (int, float)):
173            return self._numeric_drift(ref, curr)
174
175        # For categorical, use chi-square approximation
176        return self._categorical_drift(ref, curr)
177
178    def _numeric_drift(
179        self,
180        ref: List[float],
181        curr: List[float]
182    ) -> Tuple[bool, float]:
183        """Detect drift in numeric feature."""
184
185        ref_mean = statistics.mean(ref)
186        curr_mean = statistics.mean(curr)
187        ref_std = statistics.stdev(ref) if len(ref) > 1 else 1
188
189        if ref_std == 0:
190            ref_std = 1
191
192        z_score = abs(curr_mean - ref_mean) / ref_std
193
194        return z_score > 3.0, z_score
195
196    def _categorical_drift(
197        self,
198        ref: List[Any],
199        curr: List[Any]
200    ) -> Tuple[bool, float]:
201        """Detect drift in categorical feature."""
202
203        from collections import Counter
204
205        ref_counts = Counter(ref)
206        curr_counts = Counter(curr)
207
208        # Normalize to frequencies
209        ref_total = sum(ref_counts.values())
210        curr_total = sum(curr_counts.values())
211
212        all_categories = set(ref_counts.keys()) | set(curr_counts.keys())
213
214        # Calculate JS divergence approximation
215        divergence = 0
216        for cat in all_categories:
217            ref_freq = ref_counts.get(cat, 0) / ref_total
218            curr_freq = curr_counts.get(cat, 0) / curr_total
219
220            if ref_freq > 0 and curr_freq > 0:
221                divergence += abs(ref_freq - curr_freq)
222
223        return divergence > 0.3, divergence

Regression Alerts

Regression alerts notify teams when agent performance degrades. Here's how to implement intelligent alerting:

🐍python
1"""
2Regression alerting system.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime, timedelta
7from enum import Enum
8from typing import Any, Callable, Dict, List, Optional
9import asyncio
10
11
12class AlertSeverity(Enum):
13    """Alert severity levels."""
14    INFO = "info"
15    WARNING = "warning"
16    ERROR = "error"
17    CRITICAL = "critical"
18
19
20@dataclass
21class RegressionAlert:
22    """A regression alert."""
23    id: str
24    metric_name: str
25    severity: AlertSeverity
26    title: str
27    message: str
28    timestamp: datetime
29    current_value: float
30    baseline_value: float
31    threshold: float
32    metadata: Dict[str, Any] = field(default_factory=dict)
33    acknowledged: bool = False
34    resolved: bool = False
35
36
37class AlertRule:
38    """Defines when to trigger an alert."""
39
40    def __init__(
41        self,
42        name: str,
43        metric_name: str,
44        condition: str,  # "lt", "gt", "change_pct"
45        threshold: float,
46        severity: AlertSeverity = AlertSeverity.WARNING,
47        consecutive_violations: int = 1,
48        cooldown_minutes: int = 30
49    ):
50        self.name = name
51        self.metric_name = metric_name
52        self.condition = condition
53        self.threshold = threshold
54        self.severity = severity
55        self.consecutive_violations = consecutive_violations
56        self.cooldown_minutes = cooldown_minutes
57
58        self.violation_count = 0
59        self.last_alert: Optional[datetime] = None
60
61    def evaluate(
62        self,
63        current: float,
64        baseline: float
65    ) -> Optional[RegressionAlert]:
66        """Evaluate if alert should fire."""
67
68        violated = False
69
70        if self.condition == "lt" and current < self.threshold:
71            violated = True
72        elif self.condition == "gt" and current > self.threshold:
73            violated = True
74        elif self.condition == "change_pct":
75            if baseline != 0:
76                change = abs(current - baseline) / abs(baseline) * 100
77                violated = change > self.threshold
78
79        if violated:
80            self.violation_count += 1
81        else:
82            self.violation_count = 0
83
84        # Check consecutive violations
85        if self.violation_count < self.consecutive_violations:
86            return None
87
88        # Check cooldown
89        if self.last_alert:
90            elapsed = (datetime.utcnow() - self.last_alert).total_seconds()
91            if elapsed < self.cooldown_minutes * 60:
92                return None
93
94        # Fire alert
95        self.last_alert = datetime.utcnow()
96
97        return RegressionAlert(
98            id=f"{self.name}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
99            metric_name=self.metric_name,
100            severity=self.severity,
101            title=f"Regression: {self.name}",
102            message=self._format_message(current, baseline),
103            timestamp=datetime.utcnow(),
104            current_value=current,
105            baseline_value=baseline,
106            threshold=self.threshold
107        )
108
109    def _format_message(self, current: float, baseline: float) -> str:
110        """Format alert message."""
111        if self.condition == "lt":
112            return f"{self.metric_name} dropped to {current:.3f} (threshold: {self.threshold})"
113        elif self.condition == "gt":
114            return f"{self.metric_name} rose to {current:.3f} (threshold: {self.threshold})"
115        else:
116            change = (current - baseline) / baseline * 100 if baseline else 0
117            return f"{self.metric_name} changed by {change:.1f}% (threshold: {self.threshold}%)"
118
119
120class AlertManager:
121    """Manages regression alerts."""
122
123    def __init__(self, monitor: ProductionMonitor):
124        self.monitor = monitor
125        self.rules: List[AlertRule] = []
126        self.active_alerts: Dict[str, RegressionAlert] = {}
127        self.alert_history: List[RegressionAlert] = []
128        self.notifiers: List[Callable[[RegressionAlert], None]] = []
129
130    def add_rule(self, rule: AlertRule):
131        """Add an alert rule."""
132        self.rules.append(rule)
133
134    def add_notifier(self, notifier: Callable[[RegressionAlert], None]):
135        """Add alert notifier."""
136        self.notifiers.append(notifier)
137
138    async def check(self):
139        """Check all rules and fire alerts."""
140
141        for rule in self.rules:
142            stats = self.monitor.get_stats(rule.metric_name)
143
144            if not stats:
145                continue
146
147            current = stats.get("mean", 0)
148
149            # Get baseline from 1-hour window
150            baseline_stats = self.monitor.get_stats(rule.metric_name, 3600)
151            baseline = baseline_stats.get("mean", current) if baseline_stats else current
152
153            alert = rule.evaluate(current, baseline)
154
155            if alert:
156                self._fire_alert(alert)
157
158    def _fire_alert(self, alert: RegressionAlert):
159        """Fire an alert."""
160
161        # Check for duplicate
162        existing = self.active_alerts.get(alert.metric_name)
163        if existing and not existing.resolved:
164            return
165
166        self.active_alerts[alert.metric_name] = alert
167        self.alert_history.append(alert)
168
169        # Notify
170        for notifier in self.notifiers:
171            try:
172                notifier(alert)
173            except Exception:
174                pass
175
176    def acknowledge(self, alert_id: str, user: str):
177        """Acknowledge an alert."""
178        for alert in self.active_alerts.values():
179            if alert.id == alert_id:
180                alert.acknowledged = True
181                alert.metadata["acknowledged_by"] = user
182                alert.metadata["acknowledged_at"] = datetime.utcnow().isoformat()
183                break
184
185    def resolve(self, alert_id: str, user: str, resolution: str):
186        """Resolve an alert."""
187        for metric_name, alert in list(self.active_alerts.items()):
188            if alert.id == alert_id:
189                alert.resolved = True
190                alert.metadata["resolved_by"] = user
191                alert.metadata["resolved_at"] = datetime.utcnow().isoformat()
192                alert.metadata["resolution"] = resolution
193                del self.active_alerts[metric_name]
194                break
195
196    def get_active_alerts(
197        self,
198        severity: Optional[AlertSeverity] = None
199    ) -> List[RegressionAlert]:
200        """Get active alerts."""
201        alerts = list(self.active_alerts.values())
202
203        if severity:
204            alerts = [a for a in alerts if a.severity == severity]
205
206        return sorted(alerts, key=lambda a: a.severity.value, reverse=True)
207
208
209# Common alert notifiers
210
211class SlackNotifier:
212    """Sends alerts to Slack."""
213
214    def __init__(self, webhook_url: str, channel: str):
215        self.webhook_url = webhook_url
216        self.channel = channel
217
218    def __call__(self, alert: RegressionAlert):
219        """Send Slack notification."""
220        import requests
221
222        color = {
223            AlertSeverity.INFO: "#36a64f",
224            AlertSeverity.WARNING: "#ff9800",
225            AlertSeverity.ERROR: "#f44336",
226            AlertSeverity.CRITICAL: "#9c27b0"
227        }.get(alert.severity, "#757575")
228
229        payload = {
230            "channel": self.channel,
231            "attachments": [{
232                "color": color,
233                "title": alert.title,
234                "text": alert.message,
235                "fields": [
236                    {"title": "Metric", "value": alert.metric_name, "short": True},
237                    {"title": "Severity", "value": alert.severity.value, "short": True},
238                    {"title": "Current", "value": f"{alert.current_value:.3f}", "short": True},
239                    {"title": "Baseline", "value": f"{alert.baseline_value:.3f}", "short": True}
240                ],
241                "ts": int(alert.timestamp.timestamp())
242            }]
243        }
244
245        requests.post(self.webhook_url, json=payload)

Evaluation Scheduling

Scheduled evaluations run benchmarks regularly to catch regressions that might not trigger real-time alerts:

🐍python
1"""
2Evaluation scheduling system.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime, timedelta
7from enum import Enum
8from typing import Any, Callable, Dict, List, Optional
9import asyncio
10
11
12class ScheduleFrequency(Enum):
13    """Evaluation schedule frequencies."""
14    HOURLY = "hourly"
15    DAILY = "daily"
16    WEEKLY = "weekly"
17    ON_DEMAND = "on_demand"
18
19
20@dataclass
21class ScheduledEvaluation:
22    """A scheduled evaluation job."""
23    id: str
24    name: str
25    benchmark_id: str
26    frequency: ScheduleFrequency
27    enabled: bool = True
28    last_run: Optional[datetime] = None
29    next_run: Optional[datetime] = None
30    config: Dict[str, Any] = field(default_factory=dict)
31
32    def calculate_next_run(self) -> datetime:
33        """Calculate next run time."""
34        now = datetime.utcnow()
35
36        if self.frequency == ScheduleFrequency.HOURLY:
37            next_hour = now.replace(minute=0, second=0, microsecond=0)
38            if next_hour <= now:
39                next_hour += timedelta(hours=1)
40            return next_hour
41
42        elif self.frequency == ScheduleFrequency.DAILY:
43            next_day = now.replace(hour=2, minute=0, second=0, microsecond=0)
44            if next_day <= now:
45                next_day += timedelta(days=1)
46            return next_day
47
48        elif self.frequency == ScheduleFrequency.WEEKLY:
49            days_until_sunday = (6 - now.weekday()) % 7
50            if days_until_sunday == 0 and now.hour >= 2:
51                days_until_sunday = 7
52            next_week = now + timedelta(days=days_until_sunday)
53            return next_week.replace(hour=2, minute=0, second=0, microsecond=0)
54
55        return now
56
57
58@dataclass
59class EvaluationRun:
60    """Result of a scheduled evaluation run."""
61    schedule_id: str
62    run_id: str
63    start_time: datetime
64    end_time: Optional[datetime] = None
65    status: str = "running"
66    results: Dict[str, Any] = field(default_factory=dict)
67    error: Optional[str] = None
68
69
70class EvaluationScheduler:
71    """Manages scheduled evaluations."""
72
73    def __init__(self, runner_factory: Callable):
74        self.schedules: Dict[str, ScheduledEvaluation] = {}
75        self.runner_factory = runner_factory
76        self.runs: List[EvaluationRun] = []
77        self._running = False
78
79    def add_schedule(self, schedule: ScheduledEvaluation):
80        """Add a scheduled evaluation."""
81        schedule.next_run = schedule.calculate_next_run()
82        self.schedules[schedule.id] = schedule
83
84    def remove_schedule(self, schedule_id: str):
85        """Remove a scheduled evaluation."""
86        self.schedules.pop(schedule_id, None)
87
88    async def start(self):
89        """Start the scheduler."""
90        self._running = True
91
92        while self._running:
93            await self._check_schedules()
94            await asyncio.sleep(60)  # Check every minute
95
96    def stop(self):
97        """Stop the scheduler."""
98        self._running = False
99
100    async def _check_schedules(self):
101        """Check for due schedules."""
102        now = datetime.utcnow()
103
104        for schedule in self.schedules.values():
105            if not schedule.enabled:
106                continue
107
108            if schedule.next_run and schedule.next_run <= now:
109                await self._run_evaluation(schedule)
110                schedule.last_run = now
111                schedule.next_run = schedule.calculate_next_run()
112
113    async def _run_evaluation(self, schedule: ScheduledEvaluation):
114        """Run a scheduled evaluation."""
115
116        run_id = f"{schedule.id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
117
118        run = EvaluationRun(
119            schedule_id=schedule.id,
120            run_id=run_id,
121            start_time=datetime.utcnow()
122        )
123        self.runs.append(run)
124
125        try:
126            runner = await self.runner_factory(schedule.benchmark_id)
127            results = await runner.run(schedule.config)
128
129            run.status = "completed"
130            run.results = results
131            run.end_time = datetime.utcnow()
132
133        except Exception as e:
134            run.status = "failed"
135            run.error = str(e)
136            run.end_time = datetime.utcnow()
137
138    async def run_now(self, schedule_id: str) -> Optional[EvaluationRun]:
139        """Trigger immediate evaluation."""
140        schedule = self.schedules.get(schedule_id)
141
142        if not schedule:
143            return None
144
145        await self._run_evaluation(schedule)
146        return self.runs[-1] if self.runs else None
147
148    def get_run_history(
149        self,
150        schedule_id: Optional[str] = None,
151        limit: int = 100
152    ) -> List[EvaluationRun]:
153        """Get evaluation run history."""
154        runs = self.runs
155
156        if schedule_id:
157            runs = [r for r in runs if r.schedule_id == schedule_id]
158
159        return sorted(runs, key=lambda r: r.start_time, reverse=True)[:limit]
160
161
162class ContinuousEvaluationSystem:
163    """Complete continuous evaluation system."""
164
165    def __init__(
166        self,
167        monitor: ProductionMonitor,
168        scheduler: EvaluationScheduler,
169        trigger_manager: TriggerManager,
170        alert_manager: AlertManager,
171        drift_detector: DriftDetector
172    ):
173        self.monitor = monitor
174        self.scheduler = scheduler
175        self.trigger_manager = trigger_manager
176        self.alert_manager = alert_manager
177        self.drift_detector = drift_detector
178
179        # Connect components
180        self.trigger_manager.add_listener(self._on_trigger)
181
182    def _on_trigger(self, event: TriggerEvent):
183        """Handle evaluation trigger."""
184        # Log trigger
185        print(f"Evaluation triggered: {event.trigger_type.value}")
186
187        # Could trigger immediate benchmark run
188        # asyncio.create_task(self._run_triggered_evaluation(event))
189
190    async def start(self):
191        """Start all continuous evaluation components."""
192        await asyncio.gather(
193            self.scheduler.start(),
194            self.trigger_manager.start(),
195            self._alert_check_loop(),
196            self._drift_check_loop()
197        )
198
199    async def _alert_check_loop(self):
200        """Periodic alert checking."""
201        while True:
202            await self.alert_manager.check()
203            await asyncio.sleep(30)
204
205    async def _drift_check_loop(self):
206        """Periodic drift detection."""
207        while True:
208            drifting = self.drift_detector.get_drifting_metrics()
209
210            for drift_result in drifting:
211                # Could create alert for drift
212                pass
213
214            await asyncio.sleep(300)  # Every 5 minutes

Summary

This section covered the key components of continuous evaluation:

ComponentPurposeKey Features
Production MonitoringReal-time metricsWindows, aggregation, health checks
TriggersInitiate evaluationsDeployment, threshold, anomaly detection
Drift DetectionTrack changesMetric drift, data drift, baselines
AlertsNotify on issuesRules, severity, notifiers
SchedulingRegular benchmarksFrequencies, history, on-demand runs
Key Takeaways: Continuous evaluation catches problems before users notice them. Combine real-time monitoring with scheduled benchmarks, and use intelligent alerting to avoid alert fatigue.

In the next section, we'll build a complete evaluation pipeline that integrates all these components into a production-ready system.