Learning Objectives
By the end of this section, you will:
- Design a comprehensive monitoring strategy for production ML systems
- Track model performance metrics to detect degradation
- Implement alerting rules for critical RUL thresholds
- Build operational dashboards for fleet-wide visibility
- Detect data drift and model drift proactively
Core Insight: Production ML systems require monitoring at three levels: (1) infrastructure health (latency, throughput), (2) model quality (prediction accuracy, drift), and (3) business impact (maintenance savings, failure prevention). A robust monitoring strategy catches problems before they impact operations.
Monitoring Strategy
Effective monitoring spans multiple layers, from infrastructure to business outcomes.
Three Layers of Monitoring
| Layer | What to Monitor | Tools |
|---|---|---|
| Infrastructure | Latency, throughput, GPU utilization, errors | Prometheus, Grafana |
| Model Quality | Prediction distribution, feature drift, accuracy | Evidently, Alibi Detect |
| Business Impact | Prevented failures, maintenance cost savings | Custom dashboards |
Key Metrics Overview
1from dataclasses import dataclass
2from typing import Dict, List
3import time
4
5@dataclass
6class MonitoringMetrics:
7 """Core metrics to track for AMNL production deployment."""
8
9 # Infrastructure metrics
10 inference_latency_ms: float # Time per prediction
11 throughput_per_second: float # Predictions per second
12 gpu_utilization_percent: float # GPU usage
13 memory_usage_mb: float # GPU memory
14 error_rate: float # Failed predictions
15
16 # Model quality metrics
17 rul_prediction_mean: float # Average predicted RUL
18 rul_prediction_std: float # Prediction variance
19 health_class_distribution: Dict[int, float] # Class probabilities
20 feature_drift_score: float # Input feature drift
21
22 # Business metrics
23 engines_in_critical: int # Engines with RUL < 15
24 engines_in_degrading: int # Engines with 15 < RUL <= 50
25 alerts_generated: int # Number of alerts
26 prevented_failures: int # Estimated prevented failuresModel Performance Metrics
Monitoring model quality in production requires tracking prediction distributions and detecting drift.
Prediction Distribution Monitoring
1import numpy as np
2from collections import deque
3from scipy import stats
4import time
5
6class PredictionMonitor:
7 """
8 Monitor AMNL prediction distributions for anomalies and drift.
9 """
10
11 def __init__(self, window_size=1000, baseline_window=10000):
12 self.window_size = window_size
13 self.baseline_window = baseline_window
14
15 # Rolling windows for recent predictions
16 self.rul_predictions = deque(maxlen=window_size)
17 self.health_predictions = deque(maxlen=window_size)
18
19 # Baseline distributions (from initial deployment)
20 self.baseline_rul = None
21 self.baseline_health = None
22
23 # Alerts
24 self.alert_callbacks = []
25
26 def record_prediction(self, rul: float, health_probs: np.ndarray):
27 """Record a new prediction."""
28 self.rul_predictions.append(rul)
29 self.health_predictions.append(health_probs)
30
31 def compute_metrics(self) -> Dict:
32 """Compute current monitoring metrics."""
33 if len(self.rul_predictions) < 100:
34 return None
35
36 rul_array = np.array(self.rul_predictions)
37 health_array = np.array(self.health_predictions)
38
39 metrics = {
40 # RUL statistics
41 'rul_mean': float(np.mean(rul_array)),
42 'rul_std': float(np.std(rul_array)),
43 'rul_median': float(np.median(rul_array)),
44 'rul_p10': float(np.percentile(rul_array, 10)),
45 'rul_p90': float(np.percentile(rul_array, 90)),
46
47 # Health class distribution
48 'health_healthy_pct': float(np.mean(health_array[:, 0])),
49 'health_degrading_pct': float(np.mean(health_array[:, 1])),
50 'health_critical_pct': float(np.mean(health_array[:, 2])),
51
52 # Drift scores (if baseline exists)
53 'rul_drift_score': self._compute_rul_drift(rul_array),
54 'health_drift_score': self._compute_health_drift(health_array),
55
56 'timestamp': time.time()
57 }
58
59 # Check for anomalies
60 self._check_anomalies(metrics)
61
62 return metrics
63
64 def _compute_rul_drift(self, current: np.ndarray) -> float:
65 """Compute KS statistic for RUL drift detection."""
66 if self.baseline_rul is None:
67 return 0.0
68
69 statistic, p_value = stats.ks_2samp(current, self.baseline_rul)
70 return float(statistic)
71
72 def _compute_health_drift(self, current: np.ndarray) -> float:
73 """Compute drift in health class distribution."""
74 if self.baseline_health is None:
75 return 0.0
76
77 current_dist = np.mean(current, axis=0)
78 baseline_dist = np.mean(self.baseline_health, axis=0)
79
80 # Jensen-Shannon divergence
81 m = 0.5 * (current_dist + baseline_dist)
82 js_div = 0.5 * (stats.entropy(current_dist, m) + stats.entropy(baseline_dist, m))
83
84 return float(js_div)
85
86 def _check_anomalies(self, metrics: Dict):
87 """Check for anomalous conditions and trigger alerts."""
88 alerts = []
89
90 # Check for RUL drift
91 if metrics['rul_drift_score'] > 0.15:
92 alerts.append({
93 'type': 'MODEL_DRIFT',
94 'severity': 'WARNING',
95 'message': f"RUL distribution drift detected: KS={metrics['rul_drift_score']:.3f}"
96 })
97
98 # Check for unusual health distribution
99 if metrics['health_critical_pct'] > 0.3:
100 alerts.append({
101 'type': 'HIGH_CRITICAL_RATE',
102 'severity': 'CRITICAL',
103 'message': f"{metrics['health_critical_pct']*100:.1f}% of fleet in critical state"
104 })
105
106 # Check for prediction collapse (all predictions similar)
107 if metrics['rul_std'] < 1.0:
108 alerts.append({
109 'type': 'PREDICTION_COLLAPSE',
110 'severity': 'CRITICAL',
111 'message': f"RUL prediction variance collapsed: std={metrics['rul_std']:.2f}"
112 })
113
114 for alert in alerts:
115 for callback in self.alert_callbacks:
116 callback(alert)
117
118 def set_baseline(self, rul_samples: np.ndarray, health_samples: np.ndarray):
119 """Set baseline distributions for drift detection."""
120 self.baseline_rul = rul_samples
121 self.baseline_health = health_samplesFeature Drift Detection
1from evidently import ColumnMapping
2from evidently.report import Report
3from evidently.metric_preset import DataDriftPreset
4
5class FeatureDriftDetector:
6 """
7 Detect drift in input features using statistical tests.
8 """
9
10 def __init__(self, reference_data, feature_names):
11 self.reference_data = reference_data
12 self.feature_names = feature_names
13
14 def check_drift(self, current_data, threshold=0.05) -> Dict:
15 """
16 Check for feature drift against reference data.
17
18 Args:
19 current_data: Recent input data batch
20 threshold: P-value threshold for drift detection
21
22 Returns:
23 Dict with drift detection results
24 """
25 import pandas as pd
26
27 ref_df = pd.DataFrame(self.reference_data, columns=self.feature_names)
28 cur_df = pd.DataFrame(current_data, columns=self.feature_names)
29
30 # Use Evidently for drift detection
31 report = Report(metrics=[DataDriftPreset()])
32 report.run(reference_data=ref_df, current_data=cur_df)
33
34 result = report.as_dict()
35
36 # Extract drift metrics
37 drift_summary = {
38 'overall_drift_detected': result['metrics'][0]['result']['dataset_drift'],
39 'drifted_features': [],
40 'drift_scores': {}
41 }
42
43 for feature_result in result['metrics'][0]['result']['drift_by_columns']:
44 feature_name = feature_result['column_name']
45 drift_score = feature_result['drift_score']
46 is_drifted = feature_result['drift_detected']
47
48 drift_summary['drift_scores'][feature_name] = drift_score
49 if is_drifted:
50 drift_summary['drifted_features'].append(feature_name)
51
52 return drift_summaryWhen to Retrain
Trigger model retraining when: (1) RUL drift score exceeds 0.2 for 24+ hours, (2) more than 3 features show significant drift, or (3) health classification accuracy drops below 90% on holdout validation.
Alerting System
Alerts should be actionable, prioritized, and integrated with existing operations systems.
Alert Categories
| Category | Condition | Severity | Action |
|---|---|---|---|
| Critical RUL | RUL < 15 cycles | CRITICAL | Immediate maintenance |
| Degrading | 15 < RUL ≤ 50 cycles | WARNING | Schedule maintenance |
| Model Drift | KS statistic > 0.15 | WARNING | Investigate & retrain |
| System Error | Error rate > 1% | CRITICAL | Ops intervention |
| Latency Spike | P99 > 100ms | WARNING | Scale resources |
Alert Implementation
1import smtplib
2from email.mime.text import MIMEText
3import requests
4from typing import Callable, Dict, List
5from enum import Enum
6from dataclasses import dataclass
7import json
8
9class AlertSeverity(Enum):
10 INFO = "INFO"
11 WARNING = "WARNING"
12 CRITICAL = "CRITICAL"
13
14@dataclass
15class Alert:
16 alert_id: str
17 engine_id: str
18 alert_type: str
19 severity: AlertSeverity
20 message: str
21 rul_prediction: float
22 health_state: int
23 timestamp: float
24 metadata: Dict
25
26class AlertManager:
27 """
28 Manage alerts for AMNL predictions.
29 """
30
31 def __init__(self):
32 self.handlers: Dict[AlertSeverity, List[Callable]] = {
33 AlertSeverity.INFO: [],
34 AlertSeverity.WARNING: [],
35 AlertSeverity.CRITICAL: []
36 }
37 self.alert_history = []
38
39 def register_handler(self, severity: AlertSeverity, handler: Callable):
40 """Register an alert handler for a severity level."""
41 self.handlers[severity].append(handler)
42
43 def process_prediction(self, engine_id: str, rul: float, health: int):
44 """
45 Process a prediction and generate alerts if needed.
46 """
47 alerts = []
48
49 # Critical RUL alert
50 if rul <= 15:
51 alerts.append(Alert(
52 alert_id=f"{engine_id}-{int(time.time())}-critical",
53 engine_id=engine_id,
54 alert_type="CRITICAL_RUL",
55 severity=AlertSeverity.CRITICAL,
56 message=f"Engine {engine_id} has critical RUL: {rul:.1f} cycles remaining",
57 rul_prediction=rul,
58 health_state=health,
59 timestamp=time.time(),
60 metadata={"action_required": "immediate_maintenance"}
61 ))
62
63 # Degrading alert
64 elif rul <= 50:
65 alerts.append(Alert(
66 alert_id=f"{engine_id}-{int(time.time())}-warning",
67 engine_id=engine_id,
68 alert_type="DEGRADING_RUL",
69 severity=AlertSeverity.WARNING,
70 message=f"Engine {engine_id} is degrading: {rul:.1f} cycles remaining",
71 rul_prediction=rul,
72 health_state=health,
73 timestamp=time.time(),
74 metadata={"action_required": "schedule_maintenance"}
75 ))
76
77 # Dispatch alerts
78 for alert in alerts:
79 self._dispatch_alert(alert)
80
81 def _dispatch_alert(self, alert: Alert):
82 """Dispatch alert to registered handlers."""
83 self.alert_history.append(alert)
84
85 for handler in self.handlers[alert.severity]:
86 try:
87 handler(alert)
88 except Exception as e:
89 print(f"Alert handler failed: {e}")
90
91# Alert handlers
92def slack_handler(alert: Alert):
93 """Send alert to Slack."""
94 webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
95
96 color = {"CRITICAL": "danger", "WARNING": "warning", "INFO": "good"}
97
98 payload = {
99 "attachments": [{
100 "color": color[alert.severity.value],
101 "title": f"🔧 {alert.alert_type}: {alert.engine_id}",
102 "text": alert.message,
103 "fields": [
104 {"title": "RUL", "value": f"{alert.rul_prediction:.1f} cycles", "short": True},
105 {"title": "Health State", "value": ["Healthy", "Degrading", "Critical"][alert.health_state], "short": True}
106 ],
107 "ts": alert.timestamp
108 }]
109 }
110
111 requests.post(webhook_url, json=payload)
112
113def pagerduty_handler(alert: Alert):
114 """Send critical alerts to PagerDuty."""
115 if alert.severity != AlertSeverity.CRITICAL:
116 return
117
118 # PagerDuty Events API v2
119 payload = {
120 "routing_key": os.environ.get("PAGERDUTY_ROUTING_KEY"),
121 "event_action": "trigger",
122 "payload": {
123 "summary": alert.message,
124 "source": "amnl-inference",
125 "severity": "critical",
126 "custom_details": {
127 "engine_id": alert.engine_id,
128 "rul_prediction": alert.rul_prediction,
129 "health_state": alert.health_state
130 }
131 }
132 }
133
134 requests.post("https://events.pagerduty.com/v2/enqueue", json=payload)
135
136# Setup
137alert_manager = AlertManager()
138alert_manager.register_handler(AlertSeverity.WARNING, slack_handler)
139alert_manager.register_handler(AlertSeverity.CRITICAL, slack_handler)
140alert_manager.register_handler(AlertSeverity.CRITICAL, pagerduty_handler)Alert Fatigue Prevention
Implement alert deduplication and rate limiting. An engine that remains in critical state shouldn't generate new alerts every prediction cycle. Use a cooldown period (e.g., 1 hour) between repeated alerts for the same engine.
Operational Dashboards
Dashboards provide at-a-glance visibility into fleet health and system performance.
Key Dashboard Panels
| Panel | Content | Refresh Rate |
|---|---|---|
| Fleet Overview | Health distribution pie chart, engine count by state | 1 minute |
| Critical Engines | List of engines with RUL < 15 | Real-time |
| RUL Trend | Time series of average fleet RUL | 5 minutes |
| System Health | Latency, throughput, error rate gauges | 10 seconds |
| Drift Monitor | Feature drift scores, model drift indicator | 1 hour |
Prometheus Metrics Export
1from prometheus_client import Counter, Gauge, Histogram, start_http_server
2import time
3
4# Define Prometheus metrics
5PREDICTIONS_TOTAL = Counter(
6 'amnl_predictions_total',
7 'Total number of predictions',
8 ['health_state']
9)
10
11PREDICTION_LATENCY = Histogram(
12 'amnl_prediction_latency_seconds',
13 'Prediction latency in seconds',
14 buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
15)
16
17RUL_PREDICTION = Gauge(
18 'amnl_rul_prediction',
19 'Current RUL prediction',
20 ['engine_id']
21)
22
23FLEET_HEALTH = Gauge(
24 'amnl_fleet_health_engines',
25 'Number of engines by health state',
26 ['health_state']
27)
28
29MODEL_DRIFT = Gauge(
30 'amnl_model_drift_score',
31 'Model drift score (KS statistic)'
32)
33
34def record_prediction_metrics(engine_id: str, rul: float, health: int, latency: float):
35 """Record metrics for a prediction."""
36 health_labels = ['healthy', 'degrading', 'critical']
37
38 PREDICTIONS_TOTAL.labels(health_state=health_labels[health]).inc()
39 PREDICTION_LATENCY.observe(latency)
40 RUL_PREDICTION.labels(engine_id=engine_id).set(rul)
41
42def update_fleet_metrics(health_counts: Dict[int, int]):
43 """Update fleet-wide metrics."""
44 health_labels = ['healthy', 'degrading', 'critical']
45 for state, count in health_counts.items():
46 FLEET_HEALTH.labels(health_state=health_labels[state]).set(count)
47
48# Start Prometheus metrics server
49start_http_server(9090)Grafana Dashboard Configuration
1{
2 "dashboard": {
3 "title": "AMNL Predictive Maintenance",
4 "panels": [
5 {
6 "title": "Fleet Health Distribution",
7 "type": "piechart",
8 "targets": [
9 {
10 "expr": "amnl_fleet_health_engines",
11 "legendFormat": "{{health_state}}"
12 }
13 ]
14 },
15 {
16 "title": "Prediction Latency (p99)",
17 "type": "gauge",
18 "targets": [
19 {
20 "expr": "histogram_quantile(0.99, rate(amnl_prediction_latency_seconds_bucket[5m]))"
21 }
22 ],
23 "thresholds": {
24 "mode": "absolute",
25 "steps": [
26 {"color": "green", "value": null},
27 {"color": "yellow", "value": 0.05},
28 {"color": "red", "value": 0.1}
29 ]
30 }
31 },
32 {
33 "title": "Critical Engines",
34 "type": "table",
35 "targets": [
36 {
37 "expr": "amnl_rul_prediction < 15",
38 "format": "table"
39 }
40 ]
41 },
42 {
43 "title": "Model Drift Score",
44 "type": "timeseries",
45 "targets": [
46 {
47 "expr": "amnl_model_drift_score"
48 }
49 ],
50 "thresholds": [
51 {"value": 0.15, "color": "yellow"},
52 {"value": 0.25, "color": "red"}
53 ]
54 }
55 ]
56 }
57}Summary
Monitoring and Alerting - Summary:
- Three-layer monitoring: Infrastructure, model quality, and business impact
- Drift detection: Track RUL distribution and feature drift continuously
- Tiered alerting: Critical (RUL < 15), Warning (RUL < 50), Info
- Integration: Connect to Slack, PagerDuty, email for notifications
- Dashboards: Prometheus + Grafana for operational visibility
| Metric | Warning Threshold | Critical Threshold |
|---|---|---|
| RUL prediction | ≤ 50 cycles | ≤ 15 cycles |
| Prediction latency (p99) | > 50ms | > 100ms |
| Error rate | > 0.5% | > 1% |
| Model drift (KS) | > 0.15 | > 0.25 |
| Feature drift (count) | > 2 features | > 5 features |
Key Insight: Production ML monitoring is fundamentally different from traditional software monitoring. Beyond system health, we must track prediction quality, detect distribution drift, and measure business impact. The goal is to catch problems before they manifest as unplanned equipment failures—turning the monitoring system itself into a form of predictive maintenance for your ML pipeline.