Chapter 20
15 min read
Section 99 of 104

Monitoring and Alerting

Production Deployment

Learning Objectives

By the end of this section, you will:

  1. Design a comprehensive monitoring strategy for production ML systems
  2. Track model performance metrics to detect degradation
  3. Implement alerting rules for critical RUL thresholds
  4. Build operational dashboards for fleet-wide visibility
  5. Detect data drift and model drift proactively
Core Insight: Production ML systems require monitoring at three levels: (1) infrastructure health (latency, throughput), (2) model quality (prediction accuracy, drift), and (3) business impact (maintenance savings, failure prevention). A robust monitoring strategy catches problems before they impact operations.

Monitoring Strategy

Effective monitoring spans multiple layers, from infrastructure to business outcomes.

Three Layers of Monitoring

LayerWhat to MonitorTools
InfrastructureLatency, throughput, GPU utilization, errorsPrometheus, Grafana
Model QualityPrediction distribution, feature drift, accuracyEvidently, Alibi Detect
Business ImpactPrevented failures, maintenance cost savingsCustom dashboards

Key Metrics Overview

🐍python
1from dataclasses import dataclass
2from typing import Dict, List
3import time
4
5@dataclass
6class MonitoringMetrics:
7    """Core metrics to track for AMNL production deployment."""
8
9    # Infrastructure metrics
10    inference_latency_ms: float          # Time per prediction
11    throughput_per_second: float         # Predictions per second
12    gpu_utilization_percent: float       # GPU usage
13    memory_usage_mb: float               # GPU memory
14    error_rate: float                    # Failed predictions
15
16    # Model quality metrics
17    rul_prediction_mean: float           # Average predicted RUL
18    rul_prediction_std: float            # Prediction variance
19    health_class_distribution: Dict[int, float]  # Class probabilities
20    feature_drift_score: float           # Input feature drift
21
22    # Business metrics
23    engines_in_critical: int             # Engines with RUL < 15
24    engines_in_degrading: int            # Engines with 15 < RUL <= 50
25    alerts_generated: int                # Number of alerts
26    prevented_failures: int              # Estimated prevented failures

Model Performance Metrics

Monitoring model quality in production requires tracking prediction distributions and detecting drift.

Prediction Distribution Monitoring

🐍python
1import numpy as np
2from collections import deque
3from scipy import stats
4import time
5
6class PredictionMonitor:
7    """
8    Monitor AMNL prediction distributions for anomalies and drift.
9    """
10
11    def __init__(self, window_size=1000, baseline_window=10000):
12        self.window_size = window_size
13        self.baseline_window = baseline_window
14
15        # Rolling windows for recent predictions
16        self.rul_predictions = deque(maxlen=window_size)
17        self.health_predictions = deque(maxlen=window_size)
18
19        # Baseline distributions (from initial deployment)
20        self.baseline_rul = None
21        self.baseline_health = None
22
23        # Alerts
24        self.alert_callbacks = []
25
26    def record_prediction(self, rul: float, health_probs: np.ndarray):
27        """Record a new prediction."""
28        self.rul_predictions.append(rul)
29        self.health_predictions.append(health_probs)
30
31    def compute_metrics(self) -> Dict:
32        """Compute current monitoring metrics."""
33        if len(self.rul_predictions) < 100:
34            return None
35
36        rul_array = np.array(self.rul_predictions)
37        health_array = np.array(self.health_predictions)
38
39        metrics = {
40            # RUL statistics
41            'rul_mean': float(np.mean(rul_array)),
42            'rul_std': float(np.std(rul_array)),
43            'rul_median': float(np.median(rul_array)),
44            'rul_p10': float(np.percentile(rul_array, 10)),
45            'rul_p90': float(np.percentile(rul_array, 90)),
46
47            # Health class distribution
48            'health_healthy_pct': float(np.mean(health_array[:, 0])),
49            'health_degrading_pct': float(np.mean(health_array[:, 1])),
50            'health_critical_pct': float(np.mean(health_array[:, 2])),
51
52            # Drift scores (if baseline exists)
53            'rul_drift_score': self._compute_rul_drift(rul_array),
54            'health_drift_score': self._compute_health_drift(health_array),
55
56            'timestamp': time.time()
57        }
58
59        # Check for anomalies
60        self._check_anomalies(metrics)
61
62        return metrics
63
64    def _compute_rul_drift(self, current: np.ndarray) -> float:
65        """Compute KS statistic for RUL drift detection."""
66        if self.baseline_rul is None:
67            return 0.0
68
69        statistic, p_value = stats.ks_2samp(current, self.baseline_rul)
70        return float(statistic)
71
72    def _compute_health_drift(self, current: np.ndarray) -> float:
73        """Compute drift in health class distribution."""
74        if self.baseline_health is None:
75            return 0.0
76
77        current_dist = np.mean(current, axis=0)
78        baseline_dist = np.mean(self.baseline_health, axis=0)
79
80        # Jensen-Shannon divergence
81        m = 0.5 * (current_dist + baseline_dist)
82        js_div = 0.5 * (stats.entropy(current_dist, m) + stats.entropy(baseline_dist, m))
83
84        return float(js_div)
85
86    def _check_anomalies(self, metrics: Dict):
87        """Check for anomalous conditions and trigger alerts."""
88        alerts = []
89
90        # Check for RUL drift
91        if metrics['rul_drift_score'] > 0.15:
92            alerts.append({
93                'type': 'MODEL_DRIFT',
94                'severity': 'WARNING',
95                'message': f"RUL distribution drift detected: KS={metrics['rul_drift_score']:.3f}"
96            })
97
98        # Check for unusual health distribution
99        if metrics['health_critical_pct'] > 0.3:
100            alerts.append({
101                'type': 'HIGH_CRITICAL_RATE',
102                'severity': 'CRITICAL',
103                'message': f"{metrics['health_critical_pct']*100:.1f}% of fleet in critical state"
104            })
105
106        # Check for prediction collapse (all predictions similar)
107        if metrics['rul_std'] < 1.0:
108            alerts.append({
109                'type': 'PREDICTION_COLLAPSE',
110                'severity': 'CRITICAL',
111                'message': f"RUL prediction variance collapsed: std={metrics['rul_std']:.2f}"
112            })
113
114        for alert in alerts:
115            for callback in self.alert_callbacks:
116                callback(alert)
117
118    def set_baseline(self, rul_samples: np.ndarray, health_samples: np.ndarray):
119        """Set baseline distributions for drift detection."""
120        self.baseline_rul = rul_samples
121        self.baseline_health = health_samples

Feature Drift Detection

🐍python
1from evidently import ColumnMapping
2from evidently.report import Report
3from evidently.metric_preset import DataDriftPreset
4
5class FeatureDriftDetector:
6    """
7    Detect drift in input features using statistical tests.
8    """
9
10    def __init__(self, reference_data, feature_names):
11        self.reference_data = reference_data
12        self.feature_names = feature_names
13
14    def check_drift(self, current_data, threshold=0.05) -> Dict:
15        """
16        Check for feature drift against reference data.
17
18        Args:
19            current_data: Recent input data batch
20            threshold: P-value threshold for drift detection
21
22        Returns:
23            Dict with drift detection results
24        """
25        import pandas as pd
26
27        ref_df = pd.DataFrame(self.reference_data, columns=self.feature_names)
28        cur_df = pd.DataFrame(current_data, columns=self.feature_names)
29
30        # Use Evidently for drift detection
31        report = Report(metrics=[DataDriftPreset()])
32        report.run(reference_data=ref_df, current_data=cur_df)
33
34        result = report.as_dict()
35
36        # Extract drift metrics
37        drift_summary = {
38            'overall_drift_detected': result['metrics'][0]['result']['dataset_drift'],
39            'drifted_features': [],
40            'drift_scores': {}
41        }
42
43        for feature_result in result['metrics'][0]['result']['drift_by_columns']:
44            feature_name = feature_result['column_name']
45            drift_score = feature_result['drift_score']
46            is_drifted = feature_result['drift_detected']
47
48            drift_summary['drift_scores'][feature_name] = drift_score
49            if is_drifted:
50                drift_summary['drifted_features'].append(feature_name)
51
52        return drift_summary

When to Retrain

Trigger model retraining when: (1) RUL drift score exceeds 0.2 for 24+ hours, (2) more than 3 features show significant drift, or (3) health classification accuracy drops below 90% on holdout validation.


Alerting System

Alerts should be actionable, prioritized, and integrated with existing operations systems.

Alert Categories

CategoryConditionSeverityAction
Critical RULRUL < 15 cyclesCRITICALImmediate maintenance
Degrading15 < RUL ≤ 50 cyclesWARNINGSchedule maintenance
Model DriftKS statistic > 0.15WARNINGInvestigate & retrain
System ErrorError rate > 1%CRITICALOps intervention
Latency SpikeP99 > 100msWARNINGScale resources

Alert Implementation

🐍python
1import smtplib
2from email.mime.text import MIMEText
3import requests
4from typing import Callable, Dict, List
5from enum import Enum
6from dataclasses import dataclass
7import json
8
9class AlertSeverity(Enum):
10    INFO = "INFO"
11    WARNING = "WARNING"
12    CRITICAL = "CRITICAL"
13
14@dataclass
15class Alert:
16    alert_id: str
17    engine_id: str
18    alert_type: str
19    severity: AlertSeverity
20    message: str
21    rul_prediction: float
22    health_state: int
23    timestamp: float
24    metadata: Dict
25
26class AlertManager:
27    """
28    Manage alerts for AMNL predictions.
29    """
30
31    def __init__(self):
32        self.handlers: Dict[AlertSeverity, List[Callable]] = {
33            AlertSeverity.INFO: [],
34            AlertSeverity.WARNING: [],
35            AlertSeverity.CRITICAL: []
36        }
37        self.alert_history = []
38
39    def register_handler(self, severity: AlertSeverity, handler: Callable):
40        """Register an alert handler for a severity level."""
41        self.handlers[severity].append(handler)
42
43    def process_prediction(self, engine_id: str, rul: float, health: int):
44        """
45        Process a prediction and generate alerts if needed.
46        """
47        alerts = []
48
49        # Critical RUL alert
50        if rul <= 15:
51            alerts.append(Alert(
52                alert_id=f"{engine_id}-{int(time.time())}-critical",
53                engine_id=engine_id,
54                alert_type="CRITICAL_RUL",
55                severity=AlertSeverity.CRITICAL,
56                message=f"Engine {engine_id} has critical RUL: {rul:.1f} cycles remaining",
57                rul_prediction=rul,
58                health_state=health,
59                timestamp=time.time(),
60                metadata={"action_required": "immediate_maintenance"}
61            ))
62
63        # Degrading alert
64        elif rul <= 50:
65            alerts.append(Alert(
66                alert_id=f"{engine_id}-{int(time.time())}-warning",
67                engine_id=engine_id,
68                alert_type="DEGRADING_RUL",
69                severity=AlertSeverity.WARNING,
70                message=f"Engine {engine_id} is degrading: {rul:.1f} cycles remaining",
71                rul_prediction=rul,
72                health_state=health,
73                timestamp=time.time(),
74                metadata={"action_required": "schedule_maintenance"}
75            ))
76
77        # Dispatch alerts
78        for alert in alerts:
79            self._dispatch_alert(alert)
80
81    def _dispatch_alert(self, alert: Alert):
82        """Dispatch alert to registered handlers."""
83        self.alert_history.append(alert)
84
85        for handler in self.handlers[alert.severity]:
86            try:
87                handler(alert)
88            except Exception as e:
89                print(f"Alert handler failed: {e}")
90
91# Alert handlers
92def slack_handler(alert: Alert):
93    """Send alert to Slack."""
94    webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
95
96    color = {"CRITICAL": "danger", "WARNING": "warning", "INFO": "good"}
97
98    payload = {
99        "attachments": [{
100            "color": color[alert.severity.value],
101            "title": f"🔧 {alert.alert_type}: {alert.engine_id}",
102            "text": alert.message,
103            "fields": [
104                {"title": "RUL", "value": f"{alert.rul_prediction:.1f} cycles", "short": True},
105                {"title": "Health State", "value": ["Healthy", "Degrading", "Critical"][alert.health_state], "short": True}
106            ],
107            "ts": alert.timestamp
108        }]
109    }
110
111    requests.post(webhook_url, json=payload)
112
113def pagerduty_handler(alert: Alert):
114    """Send critical alerts to PagerDuty."""
115    if alert.severity != AlertSeverity.CRITICAL:
116        return
117
118    # PagerDuty Events API v2
119    payload = {
120        "routing_key": os.environ.get("PAGERDUTY_ROUTING_KEY"),
121        "event_action": "trigger",
122        "payload": {
123            "summary": alert.message,
124            "source": "amnl-inference",
125            "severity": "critical",
126            "custom_details": {
127                "engine_id": alert.engine_id,
128                "rul_prediction": alert.rul_prediction,
129                "health_state": alert.health_state
130            }
131        }
132    }
133
134    requests.post("https://events.pagerduty.com/v2/enqueue", json=payload)
135
136# Setup
137alert_manager = AlertManager()
138alert_manager.register_handler(AlertSeverity.WARNING, slack_handler)
139alert_manager.register_handler(AlertSeverity.CRITICAL, slack_handler)
140alert_manager.register_handler(AlertSeverity.CRITICAL, pagerduty_handler)

Alert Fatigue Prevention

Implement alert deduplication and rate limiting. An engine that remains in critical state shouldn't generate new alerts every prediction cycle. Use a cooldown period (e.g., 1 hour) between repeated alerts for the same engine.


Operational Dashboards

Dashboards provide at-a-glance visibility into fleet health and system performance.

Key Dashboard Panels

PanelContentRefresh Rate
Fleet OverviewHealth distribution pie chart, engine count by state1 minute
Critical EnginesList of engines with RUL < 15Real-time
RUL TrendTime series of average fleet RUL5 minutes
System HealthLatency, throughput, error rate gauges10 seconds
Drift MonitorFeature drift scores, model drift indicator1 hour

Prometheus Metrics Export

🐍python
1from prometheus_client import Counter, Gauge, Histogram, start_http_server
2import time
3
4# Define Prometheus metrics
5PREDICTIONS_TOTAL = Counter(
6    'amnl_predictions_total',
7    'Total number of predictions',
8    ['health_state']
9)
10
11PREDICTION_LATENCY = Histogram(
12    'amnl_prediction_latency_seconds',
13    'Prediction latency in seconds',
14    buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
15)
16
17RUL_PREDICTION = Gauge(
18    'amnl_rul_prediction',
19    'Current RUL prediction',
20    ['engine_id']
21)
22
23FLEET_HEALTH = Gauge(
24    'amnl_fleet_health_engines',
25    'Number of engines by health state',
26    ['health_state']
27)
28
29MODEL_DRIFT = Gauge(
30    'amnl_model_drift_score',
31    'Model drift score (KS statistic)'
32)
33
34def record_prediction_metrics(engine_id: str, rul: float, health: int, latency: float):
35    """Record metrics for a prediction."""
36    health_labels = ['healthy', 'degrading', 'critical']
37
38    PREDICTIONS_TOTAL.labels(health_state=health_labels[health]).inc()
39    PREDICTION_LATENCY.observe(latency)
40    RUL_PREDICTION.labels(engine_id=engine_id).set(rul)
41
42def update_fleet_metrics(health_counts: Dict[int, int]):
43    """Update fleet-wide metrics."""
44    health_labels = ['healthy', 'degrading', 'critical']
45    for state, count in health_counts.items():
46        FLEET_HEALTH.labels(health_state=health_labels[state]).set(count)
47
48# Start Prometheus metrics server
49start_http_server(9090)

Grafana Dashboard Configuration

{}json
1{
2  "dashboard": {
3    "title": "AMNL Predictive Maintenance",
4    "panels": [
5      {
6        "title": "Fleet Health Distribution",
7        "type": "piechart",
8        "targets": [
9          {
10            "expr": "amnl_fleet_health_engines",
11            "legendFormat": "{{health_state}}"
12          }
13        ]
14      },
15      {
16        "title": "Prediction Latency (p99)",
17        "type": "gauge",
18        "targets": [
19          {
20            "expr": "histogram_quantile(0.99, rate(amnl_prediction_latency_seconds_bucket[5m]))"
21          }
22        ],
23        "thresholds": {
24          "mode": "absolute",
25          "steps": [
26            {"color": "green", "value": null},
27            {"color": "yellow", "value": 0.05},
28            {"color": "red", "value": 0.1}
29          ]
30        }
31      },
32      {
33        "title": "Critical Engines",
34        "type": "table",
35        "targets": [
36          {
37            "expr": "amnl_rul_prediction < 15",
38            "format": "table"
39          }
40        ]
41      },
42      {
43        "title": "Model Drift Score",
44        "type": "timeseries",
45        "targets": [
46          {
47            "expr": "amnl_model_drift_score"
48          }
49        ],
50        "thresholds": [
51          {"value": 0.15, "color": "yellow"},
52          {"value": 0.25, "color": "red"}
53        ]
54      }
55    ]
56  }
57}

Summary

Monitoring and Alerting - Summary:

  1. Three-layer monitoring: Infrastructure, model quality, and business impact
  2. Drift detection: Track RUL distribution and feature drift continuously
  3. Tiered alerting: Critical (RUL < 15), Warning (RUL < 50), Info
  4. Integration: Connect to Slack, PagerDuty, email for notifications
  5. Dashboards: Prometheus + Grafana for operational visibility
MetricWarning ThresholdCritical Threshold
RUL prediction≤ 50 cycles≤ 15 cycles
Prediction latency (p99)> 50ms> 100ms
Error rate> 0.5%> 1%
Model drift (KS)> 0.15> 0.25
Feature drift (count)> 2 features> 5 features
Key Insight: Production ML monitoring is fundamentally different from traditional software monitoring. Beyond system health, we must track prediction quality, detect distribution drift, and measure business impact. The goal is to catch problems before they manifest as unplanned equipment failures—turning the monitoring system itself into a form of predictive maintenance for your ML pipeline.