Introduction
Continuous evaluation monitors agent performance in production, detecting issues before they impact users. Unlike periodic benchmarks, continuous evaluation provides real-time insights into agent behavior and automatically alerts on regressions.
The Monitoring Imperative: Agents can degrade silently due to model updates, data drift, or upstream changes. Continuous evaluation catches these issues early, often before users notice.
This section covers how to implement production monitoring, set up automated triggers, detect drift, configure alerts, and schedule regular evaluations.
Production Monitoring
Production monitoring captures real-time metrics from live agent interactions. Here's a comprehensive monitoring system:
1"""
2Production monitoring for AI agents.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime, timedelta
7from typing import Any, Callable, Dict, List, Optional
8from collections import defaultdict
9import asyncio
10import time
11
12
13@dataclass
14class MetricWindow:
15 """Time-windowed metric aggregation."""
16 name: str
17 window_size_seconds: int
18 values: List[float] = field(default_factory=list)
19 timestamps: List[datetime] = field(default_factory=list)
20
21 def add(self, value: float, timestamp: datetime = None):
22 """Add a value to the window."""
23 ts = timestamp or datetime.utcnow()
24 self.values.append(value)
25 self.timestamps.append(ts)
26 self._cleanup()
27
28 def _cleanup(self):
29 """Remove values outside the window."""
30 cutoff = datetime.utcnow() - timedelta(seconds=self.window_size_seconds)
31
32 # Find first index within window
33 start_idx = 0
34 for i, ts in enumerate(self.timestamps):
35 if ts >= cutoff:
36 start_idx = i
37 break
38
39 if start_idx > 0:
40 self.values = self.values[start_idx:]
41 self.timestamps = self.timestamps[start_idx:]
42
43 @property
44 def count(self) -> int:
45 return len(self.values)
46
47 @property
48 def mean(self) -> float:
49 return sum(self.values) / len(self.values) if self.values else 0
50
51 @property
52 def min(self) -> float:
53 return min(self.values) if self.values else 0
54
55 @property
56 def max(self) -> float:
57 return max(self.values) if self.values else 0
58
59 @property
60 def sum(self) -> float:
61 return sum(self.values)
62
63 def percentile(self, p: float) -> float:
64 """Calculate percentile."""
65 if not self.values:
66 return 0
67 sorted_values = sorted(self.values)
68 idx = int(len(sorted_values) * p / 100)
69 return sorted_values[min(idx, len(sorted_values) - 1)]
70
71
72class ProductionMonitor:
73 """Monitors agent performance in production."""
74
75 def __init__(self, window_sizes: List[int] = None):
76 self.window_sizes = window_sizes or [60, 300, 3600] # 1m, 5m, 1h
77 self.metrics: Dict[str, Dict[int, MetricWindow]] = defaultdict(dict)
78 self.counters: Dict[str, int] = defaultdict(int)
79 self.listeners: List[Callable] = []
80
81 def record(self, metric_name: str, value: float):
82 """Record a metric value."""
83
84 timestamp = datetime.utcnow()
85
86 # Add to all window sizes
87 for window_size in self.window_sizes:
88 if window_size not in self.metrics[metric_name]:
89 self.metrics[metric_name][window_size] = MetricWindow(
90 name=metric_name,
91 window_size_seconds=window_size
92 )
93 self.metrics[metric_name][window_size].add(value, timestamp)
94
95 # Notify listeners
96 for listener in self.listeners:
97 listener(metric_name, value, timestamp)
98
99 def increment(self, counter_name: str, amount: int = 1):
100 """Increment a counter."""
101 self.counters[counter_name] += amount
102
103 def get_stats(
104 self,
105 metric_name: str,
106 window_seconds: int = 300
107 ) -> Dict[str, float]:
108 """Get statistics for a metric."""
109
110 window = self.metrics.get(metric_name, {}).get(window_seconds)
111
112 if not window or window.count == 0:
113 return {}
114
115 return {
116 "count": window.count,
117 "mean": window.mean,
118 "min": window.min,
119 "max": window.max,
120 "p50": window.percentile(50),
121 "p90": window.percentile(90),
122 "p99": window.percentile(99),
123 "sum": window.sum
124 }
125
126 def get_all_stats(self, window_seconds: int = 300) -> Dict[str, Dict[str, float]]:
127 """Get statistics for all metrics."""
128 return {
129 name: self.get_stats(name, window_seconds)
130 for name in self.metrics
131 }
132
133 def add_listener(self, callback: Callable):
134 """Add a metric listener."""
135 self.listeners.append(callback)
136
137
138class AgentMetricsCollector:
139 """Collects specific agent metrics."""
140
141 def __init__(self, monitor: ProductionMonitor):
142 self.monitor = monitor
143
144 def record_request(
145 self,
146 duration_ms: float,
147 success: bool,
148 token_count: int,
149 cost: float
150 ):
151 """Record a completed agent request."""
152
153 self.monitor.record("request.duration_ms", duration_ms)
154 self.monitor.record("request.tokens", token_count)
155 self.monitor.record("request.cost", cost)
156
157 if success:
158 self.monitor.increment("request.success")
159 self.monitor.record("request.success_rate", 1.0)
160 else:
161 self.monitor.increment("request.failure")
162 self.monitor.record("request.success_rate", 0.0)
163
164 self.monitor.increment("request.total")
165
166 def record_tool_use(
167 self,
168 tool_name: str,
169 duration_ms: float,
170 success: bool
171 ):
172 """Record a tool invocation."""
173
174 self.monitor.record(f"tool.{tool_name}.duration_ms", duration_ms)
175 self.monitor.record(f"tool.{tool_name}.success", 1.0 if success else 0.0)
176 self.monitor.increment(f"tool.{tool_name}.count")
177
178 def record_quality_score(self, score: float, evaluator: str):
179 """Record a quality evaluation score."""
180 self.monitor.record(f"quality.{evaluator}", score)
181
182 def record_user_feedback(self, rating: float, feedback_type: str):
183 """Record user feedback."""
184 self.monitor.record(f"feedback.{feedback_type}", rating)
185
186
187class HealthChecker:
188 """Checks agent health based on metrics."""
189
190 def __init__(
191 self,
192 monitor: ProductionMonitor,
193 thresholds: Dict[str, Dict[str, float]] = None
194 ):
195 self.monitor = monitor
196 self.thresholds = thresholds or {
197 "request.success_rate": {"min": 0.95},
198 "request.duration_ms": {"max": 5000, "p99_max": 10000},
199 "quality.accuracy": {"min": 0.8}
200 }
201
202 def check(self) -> Dict[str, Any]:
203 """Perform health check."""
204
205 results = {
206 "status": "healthy",
207 "checks": {},
208 "timestamp": datetime.utcnow().isoformat()
209 }
210
211 issues = []
212
213 for metric_name, thresholds in self.thresholds.items():
214 stats = self.monitor.get_stats(metric_name)
215
216 if not stats:
217 continue
218
219 check_result = {"status": "ok", "stats": stats, "issues": []}
220
221 # Check min threshold
222 if "min" in thresholds and stats.get("mean", 0) < thresholds["min"]:
223 check_result["status"] = "failing"
224 check_result["issues"].append(
225 f"mean {stats['mean']:.3f} < min {thresholds['min']}"
226 )
227 issues.append(metric_name)
228
229 # Check max threshold
230 if "max" in thresholds and stats.get("mean", 0) > thresholds["max"]:
231 check_result["status"] = "failing"
232 check_result["issues"].append(
233 f"mean {stats['mean']:.3f} > max {thresholds['max']}"
234 )
235 issues.append(metric_name)
236
237 # Check p99 threshold
238 if "p99_max" in thresholds and stats.get("p99", 0) > thresholds["p99_max"]:
239 check_result["status"] = "warning"
240 check_result["issues"].append(
241 f"p99 {stats['p99']:.3f} > max {thresholds['p99_max']}"
242 )
243
244 results["checks"][metric_name] = check_result
245
246 # Set overall status
247 if issues:
248 results["status"] = "unhealthy"
249 results["failing_metrics"] = issues
250
251 return resultsAutomated Evaluation Triggers
Automated triggers initiate evaluations based on events or conditions. This enables proactive quality assurance:
1"""
2Automated evaluation triggers.
3"""
4
5from abc import ABC, abstractmethod
6from dataclasses import dataclass
7from datetime import datetime, timedelta
8from typing import Any, Callable, Dict, List, Optional
9from enum import Enum
10import asyncio
11
12
13class TriggerType(Enum):
14 """Types of evaluation triggers."""
15 DEPLOYMENT = "deployment"
16 THRESHOLD = "threshold"
17 SCHEDULE = "schedule"
18 MANUAL = "manual"
19 DRIFT = "drift"
20 ANOMALY = "anomaly"
21
22
23@dataclass
24class TriggerEvent:
25 """Event that triggered an evaluation."""
26 trigger_type: TriggerType
27 trigger_id: str
28 timestamp: datetime
29 metadata: Dict[str, Any]
30 priority: int = 0
31
32
33class EvaluationTrigger(ABC):
34 """Abstract base for evaluation triggers."""
35
36 def __init__(self, trigger_id: str, priority: int = 0):
37 self.trigger_id = trigger_id
38 self.priority = priority
39 self.enabled = True
40 self.last_triggered: Optional[datetime] = None
41 self.cooldown_seconds: int = 300 # 5 minutes
42
43 @abstractmethod
44 async def check(self) -> Optional[TriggerEvent]:
45 """Check if trigger condition is met."""
46 pass
47
48 def can_trigger(self) -> bool:
49 """Check if trigger can fire (cooldown check)."""
50 if not self.enabled:
51 return False
52
53 if self.last_triggered:
54 elapsed = (datetime.utcnow() - self.last_triggered).total_seconds()
55 if elapsed < self.cooldown_seconds:
56 return False
57
58 return True
59
60 def mark_triggered(self):
61 """Mark trigger as having fired."""
62 self.last_triggered = datetime.utcnow()
63
64
65class DeploymentTrigger(EvaluationTrigger):
66 """Triggers evaluation on new deployments."""
67
68 def __init__(
69 self,
70 trigger_id: str,
71 deployment_detector: Callable[[], Optional[str]]
72 ):
73 super().__init__(trigger_id, priority=10)
74 self.deployment_detector = deployment_detector
75 self.last_deployment: Optional[str] = None
76
77 async def check(self) -> Optional[TriggerEvent]:
78 if not self.can_trigger():
79 return None
80
81 current_deployment = self.deployment_detector()
82
83 if current_deployment and current_deployment != self.last_deployment:
84 self.last_deployment = current_deployment
85 self.mark_triggered()
86
87 return TriggerEvent(
88 trigger_type=TriggerType.DEPLOYMENT,
89 trigger_id=self.trigger_id,
90 timestamp=datetime.utcnow(),
91 metadata={"deployment_id": current_deployment},
92 priority=self.priority
93 )
94
95 return None
96
97
98class ThresholdTrigger(EvaluationTrigger):
99 """Triggers evaluation when metrics cross thresholds."""
100
101 def __init__(
102 self,
103 trigger_id: str,
104 monitor: ProductionMonitor,
105 metric_name: str,
106 threshold: float,
107 comparison: str = "lt", # "lt", "gt", "le", "ge"
108 window_seconds: int = 300
109 ):
110 super().__init__(trigger_id, priority=5)
111 self.monitor = monitor
112 self.metric_name = metric_name
113 self.threshold = threshold
114 self.comparison = comparison
115 self.window_seconds = window_seconds
116
117 async def check(self) -> Optional[TriggerEvent]:
118 if not self.can_trigger():
119 return None
120
121 stats = self.monitor.get_stats(self.metric_name, self.window_seconds)
122
123 if not stats:
124 return None
125
126 value = stats.get("mean", 0)
127
128 triggered = False
129 if self.comparison == "lt" and value < self.threshold:
130 triggered = True
131 elif self.comparison == "gt" and value > self.threshold:
132 triggered = True
133 elif self.comparison == "le" and value <= self.threshold:
134 triggered = True
135 elif self.comparison == "ge" and value >= self.threshold:
136 triggered = True
137
138 if triggered:
139 self.mark_triggered()
140 return TriggerEvent(
141 trigger_type=TriggerType.THRESHOLD,
142 trigger_id=self.trigger_id,
143 timestamp=datetime.utcnow(),
144 metadata={
145 "metric": self.metric_name,
146 "value": value,
147 "threshold": self.threshold,
148 "comparison": self.comparison
149 },
150 priority=self.priority
151 )
152
153 return None
154
155
156class AnomalyTrigger(EvaluationTrigger):
157 """Triggers evaluation on metric anomalies."""
158
159 def __init__(
160 self,
161 trigger_id: str,
162 monitor: ProductionMonitor,
163 metric_name: str,
164 std_dev_threshold: float = 3.0,
165 baseline_window_seconds: int = 3600
166 ):
167 super().__init__(trigger_id, priority=8)
168 self.monitor = monitor
169 self.metric_name = metric_name
170 self.std_dev_threshold = std_dev_threshold
171 self.baseline_window = baseline_window_seconds
172 self.baseline_mean: Optional[float] = None
173 self.baseline_std: Optional[float] = None
174
175 def update_baseline(self):
176 """Update baseline statistics."""
177 window = self.monitor.metrics.get(
178 self.metric_name, {}
179 ).get(self.baseline_window)
180
181 if window and window.count > 10:
182 import statistics
183 self.baseline_mean = window.mean
184 if len(window.values) > 1:
185 self.baseline_std = statistics.stdev(window.values)
186
187 async def check(self) -> Optional[TriggerEvent]:
188 if not self.can_trigger():
189 return None
190
191 # Get recent value
192 stats = self.monitor.get_stats(self.metric_name, 60) # Last minute
193
194 if not stats:
195 return None
196
197 # Update baseline periodically
198 self.update_baseline()
199
200 if self.baseline_mean is None or self.baseline_std is None:
201 return None
202
203 if self.baseline_std == 0:
204 return None
205
206 current = stats.get("mean", 0)
207 z_score = abs(current - self.baseline_mean) / self.baseline_std
208
209 if z_score > self.std_dev_threshold:
210 self.mark_triggered()
211 return TriggerEvent(
212 trigger_type=TriggerType.ANOMALY,
213 trigger_id=self.trigger_id,
214 timestamp=datetime.utcnow(),
215 metadata={
216 "metric": self.metric_name,
217 "current_value": current,
218 "baseline_mean": self.baseline_mean,
219 "z_score": z_score
220 },
221 priority=self.priority
222 )
223
224 return None
225
226
227class TriggerManager:
228 """Manages all evaluation triggers."""
229
230 def __init__(self):
231 self.triggers: List[EvaluationTrigger] = []
232 self.listeners: List[Callable[[TriggerEvent], None]] = []
233 self._running = False
234
235 def register(self, trigger: EvaluationTrigger):
236 """Register a trigger."""
237 self.triggers.append(trigger)
238
239 def add_listener(self, callback: Callable[[TriggerEvent], None]):
240 """Add event listener."""
241 self.listeners.append(callback)
242
243 async def start(self, check_interval: float = 10.0):
244 """Start trigger monitoring loop."""
245 self._running = True
246
247 while self._running:
248 events = await self._check_all()
249
250 # Sort by priority and dispatch
251 for event in sorted(events, key=lambda e: -e.priority):
252 for listener in self.listeners:
253 try:
254 listener(event)
255 except Exception:
256 pass
257
258 await asyncio.sleep(check_interval)
259
260 def stop(self):
261 """Stop trigger monitoring."""
262 self._running = False
263
264 async def _check_all(self) -> List[TriggerEvent]:
265 """Check all triggers."""
266 events = []
267
268 for trigger in self.triggers:
269 try:
270 event = await trigger.check()
271 if event:
272 events.append(event)
273 except Exception:
274 pass
275
276 return eventsDrift Detection
Drift detection identifies when agent behavior changes over time, often indicating a problem that requires investigation:
1"""
2Drift detection for agent evaluation.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime, timedelta
7from typing import Any, Dict, List, Optional, Tuple
8import statistics
9
10
11@dataclass
12class DriftResult:
13 """Result of drift detection."""
14 metric_name: str
15 drift_detected: bool
16 drift_score: float
17 baseline_mean: float
18 current_mean: float
19 relative_change: float
20 p_value: Optional[float] = None
21 details: Dict[str, Any] = field(default_factory=dict)
22
23
24class DriftDetector:
25 """Detects drift in agent metrics."""
26
27 def __init__(
28 self,
29 monitor: ProductionMonitor,
30 baseline_window_hours: int = 24,
31 detection_window_hours: int = 1,
32 sensitivity: float = 0.1 # 10% change threshold
33 ):
34 self.monitor = monitor
35 self.baseline_window = baseline_window_hours * 3600
36 self.detection_window = detection_window_hours * 3600
37 self.sensitivity = sensitivity
38 self.baselines: Dict[str, Dict[str, float]] = {}
39
40 def update_baselines(self):
41 """Update baseline statistics for all metrics."""
42
43 for metric_name in self.monitor.metrics:
44 window = self.monitor.metrics[metric_name].get(self.baseline_window)
45
46 if window and window.count > 100:
47 self.baselines[metric_name] = {
48 "mean": window.mean,
49 "std": statistics.stdev(window.values) if len(window.values) > 1 else 0,
50 "min": window.min,
51 "max": window.max,
52 "count": window.count,
53 "updated_at": datetime.utcnow().isoformat()
54 }
55
56 def detect(self, metric_name: str) -> DriftResult:
57 """Detect drift for a specific metric."""
58
59 baseline = self.baselines.get(metric_name)
60 current_stats = self.monitor.get_stats(metric_name, self.detection_window)
61
62 if not baseline or not current_stats:
63 return DriftResult(
64 metric_name=metric_name,
65 drift_detected=False,
66 drift_score=0.0,
67 baseline_mean=0,
68 current_mean=0,
69 relative_change=0,
70 details={"error": "Insufficient data"}
71 )
72
73 baseline_mean = baseline["mean"]
74 current_mean = current_stats["mean"]
75
76 # Calculate relative change
77 if baseline_mean != 0:
78 relative_change = (current_mean - baseline_mean) / abs(baseline_mean)
79 else:
80 relative_change = current_mean
81
82 # Calculate drift score (z-score like)
83 baseline_std = baseline.get("std", 0)
84 if baseline_std > 0:
85 drift_score = abs(current_mean - baseline_mean) / baseline_std
86 else:
87 drift_score = abs(relative_change)
88
89 # Detect drift
90 drift_detected = (
91 abs(relative_change) > self.sensitivity or
92 drift_score > 3.0
93 )
94
95 return DriftResult(
96 metric_name=metric_name,
97 drift_detected=drift_detected,
98 drift_score=drift_score,
99 baseline_mean=baseline_mean,
100 current_mean=current_mean,
101 relative_change=relative_change,
102 details={
103 "baseline_std": baseline_std,
104 "sensitivity": self.sensitivity,
105 "baseline_count": baseline["count"]
106 }
107 )
108
109 def detect_all(self) -> Dict[str, DriftResult]:
110 """Detect drift for all monitored metrics."""
111 self.update_baselines()
112
113 return {
114 metric_name: self.detect(metric_name)
115 for metric_name in self.baselines
116 }
117
118 def get_drifting_metrics(self) -> List[DriftResult]:
119 """Get all metrics showing drift."""
120 all_results = self.detect_all()
121 return [r for r in all_results.values() if r.drift_detected]
122
123
124class DataDriftDetector:
125 """Detects drift in input data distributions."""
126
127 def __init__(self, reference_window: int = 1000):
128 self.reference_window = reference_window
129 self.reference_data: Dict[str, List[Any]] = {}
130 self.current_data: Dict[str, List[Any]] = {}
131
132 def add_reference(self, feature_name: str, value: Any):
133 """Add to reference distribution."""
134 if feature_name not in self.reference_data:
135 self.reference_data[feature_name] = []
136
137 self.reference_data[feature_name].append(value)
138
139 # Keep window size
140 if len(self.reference_data[feature_name]) > self.reference_window:
141 self.reference_data[feature_name] = (
142 self.reference_data[feature_name][-self.reference_window:]
143 )
144
145 def add_current(self, feature_name: str, value: Any):
146 """Add to current distribution."""
147 if feature_name not in self.current_data:
148 self.current_data[feature_name] = []
149
150 self.current_data[feature_name].append(value)
151
152 # Keep smaller window for current
153 window = self.reference_window // 10
154 if len(self.current_data[feature_name]) > window:
155 self.current_data[feature_name] = (
156 self.current_data[feature_name][-window:]
157 )
158
159 def detect_drift(
160 self,
161 feature_name: str
162 ) -> Tuple[bool, float]:
163 """Detect drift for a feature."""
164
165 ref = self.reference_data.get(feature_name, [])
166 curr = self.current_data.get(feature_name, [])
167
168 if len(ref) < 100 or len(curr) < 10:
169 return False, 0.0
170
171 # For numeric data, use KS-test approximation
172 if isinstance(ref[0], (int, float)):
173 return self._numeric_drift(ref, curr)
174
175 # For categorical, use chi-square approximation
176 return self._categorical_drift(ref, curr)
177
178 def _numeric_drift(
179 self,
180 ref: List[float],
181 curr: List[float]
182 ) -> Tuple[bool, float]:
183 """Detect drift in numeric feature."""
184
185 ref_mean = statistics.mean(ref)
186 curr_mean = statistics.mean(curr)
187 ref_std = statistics.stdev(ref) if len(ref) > 1 else 1
188
189 if ref_std == 0:
190 ref_std = 1
191
192 z_score = abs(curr_mean - ref_mean) / ref_std
193
194 return z_score > 3.0, z_score
195
196 def _categorical_drift(
197 self,
198 ref: List[Any],
199 curr: List[Any]
200 ) -> Tuple[bool, float]:
201 """Detect drift in categorical feature."""
202
203 from collections import Counter
204
205 ref_counts = Counter(ref)
206 curr_counts = Counter(curr)
207
208 # Normalize to frequencies
209 ref_total = sum(ref_counts.values())
210 curr_total = sum(curr_counts.values())
211
212 all_categories = set(ref_counts.keys()) | set(curr_counts.keys())
213
214 # Calculate JS divergence approximation
215 divergence = 0
216 for cat in all_categories:
217 ref_freq = ref_counts.get(cat, 0) / ref_total
218 curr_freq = curr_counts.get(cat, 0) / curr_total
219
220 if ref_freq > 0 and curr_freq > 0:
221 divergence += abs(ref_freq - curr_freq)
222
223 return divergence > 0.3, divergenceRegression Alerts
Regression alerts notify teams when agent performance degrades. Here's how to implement intelligent alerting:
1"""
2Regression alerting system.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime, timedelta
7from enum import Enum
8from typing import Any, Callable, Dict, List, Optional
9import asyncio
10
11
12class AlertSeverity(Enum):
13 """Alert severity levels."""
14 INFO = "info"
15 WARNING = "warning"
16 ERROR = "error"
17 CRITICAL = "critical"
18
19
20@dataclass
21class RegressionAlert:
22 """A regression alert."""
23 id: str
24 metric_name: str
25 severity: AlertSeverity
26 title: str
27 message: str
28 timestamp: datetime
29 current_value: float
30 baseline_value: float
31 threshold: float
32 metadata: Dict[str, Any] = field(default_factory=dict)
33 acknowledged: bool = False
34 resolved: bool = False
35
36
37class AlertRule:
38 """Defines when to trigger an alert."""
39
40 def __init__(
41 self,
42 name: str,
43 metric_name: str,
44 condition: str, # "lt", "gt", "change_pct"
45 threshold: float,
46 severity: AlertSeverity = AlertSeverity.WARNING,
47 consecutive_violations: int = 1,
48 cooldown_minutes: int = 30
49 ):
50 self.name = name
51 self.metric_name = metric_name
52 self.condition = condition
53 self.threshold = threshold
54 self.severity = severity
55 self.consecutive_violations = consecutive_violations
56 self.cooldown_minutes = cooldown_minutes
57
58 self.violation_count = 0
59 self.last_alert: Optional[datetime] = None
60
61 def evaluate(
62 self,
63 current: float,
64 baseline: float
65 ) -> Optional[RegressionAlert]:
66 """Evaluate if alert should fire."""
67
68 violated = False
69
70 if self.condition == "lt" and current < self.threshold:
71 violated = True
72 elif self.condition == "gt" and current > self.threshold:
73 violated = True
74 elif self.condition == "change_pct":
75 if baseline != 0:
76 change = abs(current - baseline) / abs(baseline) * 100
77 violated = change > self.threshold
78
79 if violated:
80 self.violation_count += 1
81 else:
82 self.violation_count = 0
83
84 # Check consecutive violations
85 if self.violation_count < self.consecutive_violations:
86 return None
87
88 # Check cooldown
89 if self.last_alert:
90 elapsed = (datetime.utcnow() - self.last_alert).total_seconds()
91 if elapsed < self.cooldown_minutes * 60:
92 return None
93
94 # Fire alert
95 self.last_alert = datetime.utcnow()
96
97 return RegressionAlert(
98 id=f"{self.name}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
99 metric_name=self.metric_name,
100 severity=self.severity,
101 title=f"Regression: {self.name}",
102 message=self._format_message(current, baseline),
103 timestamp=datetime.utcnow(),
104 current_value=current,
105 baseline_value=baseline,
106 threshold=self.threshold
107 )
108
109 def _format_message(self, current: float, baseline: float) -> str:
110 """Format alert message."""
111 if self.condition == "lt":
112 return f"{self.metric_name} dropped to {current:.3f} (threshold: {self.threshold})"
113 elif self.condition == "gt":
114 return f"{self.metric_name} rose to {current:.3f} (threshold: {self.threshold})"
115 else:
116 change = (current - baseline) / baseline * 100 if baseline else 0
117 return f"{self.metric_name} changed by {change:.1f}% (threshold: {self.threshold}%)"
118
119
120class AlertManager:
121 """Manages regression alerts."""
122
123 def __init__(self, monitor: ProductionMonitor):
124 self.monitor = monitor
125 self.rules: List[AlertRule] = []
126 self.active_alerts: Dict[str, RegressionAlert] = {}
127 self.alert_history: List[RegressionAlert] = []
128 self.notifiers: List[Callable[[RegressionAlert], None]] = []
129
130 def add_rule(self, rule: AlertRule):
131 """Add an alert rule."""
132 self.rules.append(rule)
133
134 def add_notifier(self, notifier: Callable[[RegressionAlert], None]):
135 """Add alert notifier."""
136 self.notifiers.append(notifier)
137
138 async def check(self):
139 """Check all rules and fire alerts."""
140
141 for rule in self.rules:
142 stats = self.monitor.get_stats(rule.metric_name)
143
144 if not stats:
145 continue
146
147 current = stats.get("mean", 0)
148
149 # Get baseline from 1-hour window
150 baseline_stats = self.monitor.get_stats(rule.metric_name, 3600)
151 baseline = baseline_stats.get("mean", current) if baseline_stats else current
152
153 alert = rule.evaluate(current, baseline)
154
155 if alert:
156 self._fire_alert(alert)
157
158 def _fire_alert(self, alert: RegressionAlert):
159 """Fire an alert."""
160
161 # Check for duplicate
162 existing = self.active_alerts.get(alert.metric_name)
163 if existing and not existing.resolved:
164 return
165
166 self.active_alerts[alert.metric_name] = alert
167 self.alert_history.append(alert)
168
169 # Notify
170 for notifier in self.notifiers:
171 try:
172 notifier(alert)
173 except Exception:
174 pass
175
176 def acknowledge(self, alert_id: str, user: str):
177 """Acknowledge an alert."""
178 for alert in self.active_alerts.values():
179 if alert.id == alert_id:
180 alert.acknowledged = True
181 alert.metadata["acknowledged_by"] = user
182 alert.metadata["acknowledged_at"] = datetime.utcnow().isoformat()
183 break
184
185 def resolve(self, alert_id: str, user: str, resolution: str):
186 """Resolve an alert."""
187 for metric_name, alert in list(self.active_alerts.items()):
188 if alert.id == alert_id:
189 alert.resolved = True
190 alert.metadata["resolved_by"] = user
191 alert.metadata["resolved_at"] = datetime.utcnow().isoformat()
192 alert.metadata["resolution"] = resolution
193 del self.active_alerts[metric_name]
194 break
195
196 def get_active_alerts(
197 self,
198 severity: Optional[AlertSeverity] = None
199 ) -> List[RegressionAlert]:
200 """Get active alerts."""
201 alerts = list(self.active_alerts.values())
202
203 if severity:
204 alerts = [a for a in alerts if a.severity == severity]
205
206 return sorted(alerts, key=lambda a: a.severity.value, reverse=True)
207
208
209# Common alert notifiers
210
211class SlackNotifier:
212 """Sends alerts to Slack."""
213
214 def __init__(self, webhook_url: str, channel: str):
215 self.webhook_url = webhook_url
216 self.channel = channel
217
218 def __call__(self, alert: RegressionAlert):
219 """Send Slack notification."""
220 import requests
221
222 color = {
223 AlertSeverity.INFO: "#36a64f",
224 AlertSeverity.WARNING: "#ff9800",
225 AlertSeverity.ERROR: "#f44336",
226 AlertSeverity.CRITICAL: "#9c27b0"
227 }.get(alert.severity, "#757575")
228
229 payload = {
230 "channel": self.channel,
231 "attachments": [{
232 "color": color,
233 "title": alert.title,
234 "text": alert.message,
235 "fields": [
236 {"title": "Metric", "value": alert.metric_name, "short": True},
237 {"title": "Severity", "value": alert.severity.value, "short": True},
238 {"title": "Current", "value": f"{alert.current_value:.3f}", "short": True},
239 {"title": "Baseline", "value": f"{alert.baseline_value:.3f}", "short": True}
240 ],
241 "ts": int(alert.timestamp.timestamp())
242 }]
243 }
244
245 requests.post(self.webhook_url, json=payload)Evaluation Scheduling
Scheduled evaluations run benchmarks regularly to catch regressions that might not trigger real-time alerts:
1"""
2Evaluation scheduling system.
3"""
4
5from dataclasses import dataclass, field
6from datetime import datetime, timedelta
7from enum import Enum
8from typing import Any, Callable, Dict, List, Optional
9import asyncio
10
11
12class ScheduleFrequency(Enum):
13 """Evaluation schedule frequencies."""
14 HOURLY = "hourly"
15 DAILY = "daily"
16 WEEKLY = "weekly"
17 ON_DEMAND = "on_demand"
18
19
20@dataclass
21class ScheduledEvaluation:
22 """A scheduled evaluation job."""
23 id: str
24 name: str
25 benchmark_id: str
26 frequency: ScheduleFrequency
27 enabled: bool = True
28 last_run: Optional[datetime] = None
29 next_run: Optional[datetime] = None
30 config: Dict[str, Any] = field(default_factory=dict)
31
32 def calculate_next_run(self) -> datetime:
33 """Calculate next run time."""
34 now = datetime.utcnow()
35
36 if self.frequency == ScheduleFrequency.HOURLY:
37 next_hour = now.replace(minute=0, second=0, microsecond=0)
38 if next_hour <= now:
39 next_hour += timedelta(hours=1)
40 return next_hour
41
42 elif self.frequency == ScheduleFrequency.DAILY:
43 next_day = now.replace(hour=2, minute=0, second=0, microsecond=0)
44 if next_day <= now:
45 next_day += timedelta(days=1)
46 return next_day
47
48 elif self.frequency == ScheduleFrequency.WEEKLY:
49 days_until_sunday = (6 - now.weekday()) % 7
50 if days_until_sunday == 0 and now.hour >= 2:
51 days_until_sunday = 7
52 next_week = now + timedelta(days=days_until_sunday)
53 return next_week.replace(hour=2, minute=0, second=0, microsecond=0)
54
55 return now
56
57
58@dataclass
59class EvaluationRun:
60 """Result of a scheduled evaluation run."""
61 schedule_id: str
62 run_id: str
63 start_time: datetime
64 end_time: Optional[datetime] = None
65 status: str = "running"
66 results: Dict[str, Any] = field(default_factory=dict)
67 error: Optional[str] = None
68
69
70class EvaluationScheduler:
71 """Manages scheduled evaluations."""
72
73 def __init__(self, runner_factory: Callable):
74 self.schedules: Dict[str, ScheduledEvaluation] = {}
75 self.runner_factory = runner_factory
76 self.runs: List[EvaluationRun] = []
77 self._running = False
78
79 def add_schedule(self, schedule: ScheduledEvaluation):
80 """Add a scheduled evaluation."""
81 schedule.next_run = schedule.calculate_next_run()
82 self.schedules[schedule.id] = schedule
83
84 def remove_schedule(self, schedule_id: str):
85 """Remove a scheduled evaluation."""
86 self.schedules.pop(schedule_id, None)
87
88 async def start(self):
89 """Start the scheduler."""
90 self._running = True
91
92 while self._running:
93 await self._check_schedules()
94 await asyncio.sleep(60) # Check every minute
95
96 def stop(self):
97 """Stop the scheduler."""
98 self._running = False
99
100 async def _check_schedules(self):
101 """Check for due schedules."""
102 now = datetime.utcnow()
103
104 for schedule in self.schedules.values():
105 if not schedule.enabled:
106 continue
107
108 if schedule.next_run and schedule.next_run <= now:
109 await self._run_evaluation(schedule)
110 schedule.last_run = now
111 schedule.next_run = schedule.calculate_next_run()
112
113 async def _run_evaluation(self, schedule: ScheduledEvaluation):
114 """Run a scheduled evaluation."""
115
116 run_id = f"{schedule.id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
117
118 run = EvaluationRun(
119 schedule_id=schedule.id,
120 run_id=run_id,
121 start_time=datetime.utcnow()
122 )
123 self.runs.append(run)
124
125 try:
126 runner = await self.runner_factory(schedule.benchmark_id)
127 results = await runner.run(schedule.config)
128
129 run.status = "completed"
130 run.results = results
131 run.end_time = datetime.utcnow()
132
133 except Exception as e:
134 run.status = "failed"
135 run.error = str(e)
136 run.end_time = datetime.utcnow()
137
138 async def run_now(self, schedule_id: str) -> Optional[EvaluationRun]:
139 """Trigger immediate evaluation."""
140 schedule = self.schedules.get(schedule_id)
141
142 if not schedule:
143 return None
144
145 await self._run_evaluation(schedule)
146 return self.runs[-1] if self.runs else None
147
148 def get_run_history(
149 self,
150 schedule_id: Optional[str] = None,
151 limit: int = 100
152 ) -> List[EvaluationRun]:
153 """Get evaluation run history."""
154 runs = self.runs
155
156 if schedule_id:
157 runs = [r for r in runs if r.schedule_id == schedule_id]
158
159 return sorted(runs, key=lambda r: r.start_time, reverse=True)[:limit]
160
161
162class ContinuousEvaluationSystem:
163 """Complete continuous evaluation system."""
164
165 def __init__(
166 self,
167 monitor: ProductionMonitor,
168 scheduler: EvaluationScheduler,
169 trigger_manager: TriggerManager,
170 alert_manager: AlertManager,
171 drift_detector: DriftDetector
172 ):
173 self.monitor = monitor
174 self.scheduler = scheduler
175 self.trigger_manager = trigger_manager
176 self.alert_manager = alert_manager
177 self.drift_detector = drift_detector
178
179 # Connect components
180 self.trigger_manager.add_listener(self._on_trigger)
181
182 def _on_trigger(self, event: TriggerEvent):
183 """Handle evaluation trigger."""
184 # Log trigger
185 print(f"Evaluation triggered: {event.trigger_type.value}")
186
187 # Could trigger immediate benchmark run
188 # asyncio.create_task(self._run_triggered_evaluation(event))
189
190 async def start(self):
191 """Start all continuous evaluation components."""
192 await asyncio.gather(
193 self.scheduler.start(),
194 self.trigger_manager.start(),
195 self._alert_check_loop(),
196 self._drift_check_loop()
197 )
198
199 async def _alert_check_loop(self):
200 """Periodic alert checking."""
201 while True:
202 await self.alert_manager.check()
203 await asyncio.sleep(30)
204
205 async def _drift_check_loop(self):
206 """Periodic drift detection."""
207 while True:
208 drifting = self.drift_detector.get_drifting_metrics()
209
210 for drift_result in drifting:
211 # Could create alert for drift
212 pass
213
214 await asyncio.sleep(300) # Every 5 minutesSummary
This section covered the key components of continuous evaluation:
| Component | Purpose | Key Features |
|---|---|---|
| Production Monitoring | Real-time metrics | Windows, aggregation, health checks |
| Triggers | Initiate evaluations | Deployment, threshold, anomaly detection |
| Drift Detection | Track changes | Metric drift, data drift, baselines |
| Alerts | Notify on issues | Rules, severity, notifiers |
| Scheduling | Regular benchmarks | Frequencies, history, on-demand runs |
Key Takeaways: Continuous evaluation catches problems before users notice them. Combine real-time monitoring with scheduled benchmarks, and use intelligent alerting to avoid alert fatigue.
In the next section, we'll build a complete evaluation pipeline that integrates all these components into a production-ready system.