Learning Objectives
By the end of this section, you will:
- Choose between retraining strategies (scheduled, triggered, continuous)
- Build an automated retraining pipeline with validation gates
- Deploy model updates safely using canary and blue-green strategies
- Implement model versioning with rollback capabilities
- Handle A/B testing for model improvements
Core Insight: Model maintenance is as important as initial development. A production ML system must continuously adapt to changing data distributions, new equipment, and evolving operating conditions. Safe deployment practices—canary releases, shadow mode, and automated rollback—protect against degraded model performance.
Model Update Strategies
Different update strategies balance freshness, cost, and risk differently.
Strategy Comparison
| Strategy | Trigger | Frequency | Best For |
|---|---|---|---|
| Scheduled | Fixed calendar | Weekly/Monthly | Stable environments |
| Triggered | Drift detection | On-demand | Dynamic environments |
| Continuous | New data arrival | Real-time | High-value predictions |
| Manual | Business decision | Ad-hoc | Major model changes |
When to Retrain
Automated Retraining Pipeline
An automated pipeline ensures consistent, reproducible model updates with proper validation.
Pipeline Architecture
1from dataclasses import dataclass
2from typing import Optional, Dict
3import mlflow
4from datetime import datetime
5import subprocess
6
7@dataclass
8class RetrainingConfig:
9 """Configuration for automated retraining."""
10 data_path: str
11 model_name: str
12 min_samples: int = 10000
13 validation_threshold_rmse: float = 12.0
14 validation_threshold_health_acc: float = 0.90
15 max_training_time_hours: float = 4.0
16
17class AutomatedRetrainingPipeline:
18 """
19 Automated model retraining with validation gates.
20 """
21
22 def __init__(self, config: RetrainingConfig):
23 self.config = config
24 self.mlflow_client = mlflow.tracking.MlflowClient()
25
26 def run_pipeline(self) -> Dict:
27 """
28 Execute the full retraining pipeline.
29
30 Returns:
31 Dict with pipeline status and model info
32 """
33 result = {
34 'started_at': datetime.now().isoformat(),
35 'status': 'started',
36 'stages': {}
37 }
38
39 try:
40 # Stage 1: Data preparation
41 result['stages']['data_prep'] = self._prepare_data()
42
43 # Stage 2: Training
44 result['stages']['training'] = self._train_model()
45
46 # Stage 3: Validation
47 result['stages']['validation'] = self._validate_model()
48
49 # Stage 4: Gate check
50 if not self._passes_gates(result['stages']['validation']):
51 result['status'] = 'rejected'
52 result['reason'] = 'Failed validation gates'
53 return result
54
55 # Stage 5: Model registration
56 result['stages']['registration'] = self._register_model()
57
58 # Stage 6: Export
59 result['stages']['export'] = self._export_model()
60
61 result['status'] = 'completed'
62 result['completed_at'] = datetime.now().isoformat()
63
64 except Exception as e:
65 result['status'] = 'failed'
66 result['error'] = str(e)
67
68 return result
69
70 def _prepare_data(self) -> Dict:
71 """Prepare training data from recent sensor readings."""
72 # Fetch recent data
73 # Split into train/val/test
74 # Apply preprocessing
75 return {
76 'train_samples': 45000,
77 'val_samples': 5000,
78 'test_samples': 5000,
79 'date_range': '2024-10-01 to 2024-12-31'
80 }
81
82 def _train_model(self) -> Dict:
83 """Train AMNL model with MLflow tracking."""
84 with mlflow.start_run(run_name=f"amnl_retrain_{datetime.now().strftime('%Y%m%d')}"):
85 # Log parameters
86 mlflow.log_params({
87 'learning_rate': 1e-3,
88 'batch_size': 256,
89 'epochs': 100,
90 'loss_weight_rul': 0.5,
91 'loss_weight_health': 0.5
92 })
93
94 # Training logic (simplified)
95 # model = train_amnl(...)
96
97 # Log metrics
98 mlflow.log_metrics({
99 'train_rmse': 8.5,
100 'val_rmse': 9.2,
101 'health_accuracy': 0.94
102 })
103
104 # Log model
105 # mlflow.pytorch.log_model(model, "model")
106
107 return {
108 'run_id': mlflow.active_run().info.run_id,
109 'train_rmse': 8.5,
110 'val_rmse': 9.2,
111 'training_time_hours': 1.5
112 }
113
114 def _validate_model(self) -> Dict:
115 """Validate model on held-out test set."""
116 # Run evaluation on test set
117 return {
118 'test_rmse': 9.4,
119 'test_health_accuracy': 0.93,
120 'per_dataset_rmse': {
121 'FD001': 10.2,
122 'FD002': 6.8,
123 'FD003': 9.1,
124 'FD004': 8.3
125 }
126 }
127
128 def _passes_gates(self, validation: Dict) -> bool:
129 """Check if model passes quality gates."""
130 rmse_ok = validation['test_rmse'] < self.config.validation_threshold_rmse
131 acc_ok = validation['test_health_accuracy'] > self.config.validation_threshold_health_acc
132
133 # Check against current production model
134 prod_rmse = self._get_production_rmse()
135 improvement_ok = validation['test_rmse'] <= prod_rmse * 1.05 # Allow 5% slack
136
137 return rmse_ok and acc_ok and improvement_ok
138
139 def _get_production_rmse(self) -> float:
140 """Get current production model's RMSE."""
141 # Fetch from model registry or monitoring
142 return 9.5
143
144 def _register_model(self) -> Dict:
145 """Register validated model in MLflow registry."""
146 # Register as new version
147 model_version = self.mlflow_client.create_model_version(
148 name=self.config.model_name,
149 source="runs:/run_id/model",
150 run_id="run_id"
151 )
152
153 return {
154 'version': model_version.version,
155 'stage': 'Staging'
156 }
157
158 def _export_model(self) -> Dict:
159 """Export model to deployment formats."""
160 # Export to ONNX and TensorRT
161 return {
162 'onnx_path': '/models/amnl_v2.1.onnx',
163 'tensorrt_path': '/models/amnl_v2.1.engine'
164 }Pipeline Triggers
1from apscheduler.schedulers.background import BackgroundScheduler
2from apscheduler.triggers.cron import CronTrigger
3
4class RetrainingScheduler:
5 """
6 Schedule and trigger model retraining.
7 """
8
9 def __init__(self, pipeline: AutomatedRetrainingPipeline):
10 self.pipeline = pipeline
11 self.scheduler = BackgroundScheduler()
12
13 # Scheduled retraining: Every Sunday at 2 AM
14 self.scheduler.add_job(
15 self._scheduled_retrain,
16 CronTrigger(day_of_week='sun', hour=2),
17 id='scheduled_retrain'
18 )
19
20 def trigger_on_drift(self, drift_score: float):
21 """Trigger retraining when drift is detected."""
22 if drift_score > 0.2:
23 print(f"Drift detected ({drift_score:.3f}), triggering retraining")
24 self.scheduler.add_job(
25 self.pipeline.run_pipeline,
26 'date', # Run once immediately
27 id=f'drift_retrain_{datetime.now().timestamp()}'
28 )
29
30 def _scheduled_retrain(self):
31 """Run scheduled retraining."""
32 print("Running scheduled weekly retraining")
33 result = self.pipeline.run_pipeline()
34 self._notify_result(result)
35
36 def _notify_result(self, result: Dict):
37 """Send notification about retraining result."""
38 # Send to Slack, email, etc.
39 pass
40
41 def start(self):
42 """Start the scheduler."""
43 self.scheduler.start()Safe Model Deployment
Deploying a new model version requires careful strategies to minimize risk.
Deployment Strategies
| Strategy | Description | Risk Level | Rollback Speed |
|---|---|---|---|
| Shadow Mode | New model runs in parallel, no impact | Zero | Instant |
| Canary | Route 5-10% of traffic to new model | Low | Seconds |
| Blue-Green | Switch all traffic atomically | Medium | Seconds |
| Rolling | Gradually replace instances | Medium | Minutes |
Canary Deployment Implementation
1import random
2from typing import Tuple
3
4class CanaryDeployment:
5 """
6 Canary deployment for safe model updates.
7 """
8
9 def __init__(
10 self,
11 production_model,
12 canary_model,
13 canary_percentage: float = 0.05
14 ):
15 self.production_model = production_model
16 self.canary_model = canary_model
17 self.canary_percentage = canary_percentage
18
19 # Metrics tracking
20 self.production_predictions = []
21 self.canary_predictions = []
22
23 def predict(self, engine_id: str, window) -> Tuple[float, int, str]:
24 """
25 Route prediction to production or canary model.
26
27 Returns:
28 Tuple of (rul, health, model_version)
29 """
30 # Deterministic routing based on engine_id for consistency
31 use_canary = hash(engine_id) % 100 < (self.canary_percentage * 100)
32
33 if use_canary:
34 rul, health = self.canary_model.predict(window)
35 self.canary_predictions.append({'rul': rul, 'health': health})
36 return rul, health, 'canary'
37 else:
38 rul, health = self.production_model.predict(window)
39 self.production_predictions.append({'rul': rul, 'health': health})
40 return rul, health, 'production'
41
42 def get_comparison_metrics(self) -> Dict:
43 """Compare production vs canary performance."""
44 if len(self.canary_predictions) < 100:
45 return {'status': 'insufficient_data'}
46
47 prod_ruls = [p['rul'] for p in self.production_predictions]
48 canary_ruls = [p['rul'] for p in self.canary_predictions]
49
50 return {
51 'production': {
52 'mean_rul': np.mean(prod_ruls),
53 'std_rul': np.std(prod_ruls),
54 'count': len(prod_ruls)
55 },
56 'canary': {
57 'mean_rul': np.mean(canary_ruls),
58 'std_rul': np.std(canary_ruls),
59 'count': len(canary_ruls)
60 },
61 'difference': {
62 'mean_diff': np.mean(canary_ruls) - np.mean(prod_ruls),
63 'std_diff': np.std(canary_ruls) - np.std(prod_ruls)
64 }
65 }
66
67 def promote_canary(self):
68 """Promote canary to production."""
69 self.production_model = self.canary_model
70 self.canary_model = None
71 self.canary_percentage = 0.0
72 print("Canary promoted to production")
73
74 def rollback(self):
75 """Rollback canary, keep production."""
76 self.canary_model = None
77 self.canary_percentage = 0.0
78 print("Canary rolled back")Automated Promotion/Rollback
1class CanaryController:
2 """
3 Automated canary promotion and rollback based on metrics.
4 """
5
6 def __init__(
7 self,
8 deployment: CanaryDeployment,
9 min_samples: int = 1000,
10 max_regression_pct: float = 5.0,
11 promotion_stages: List[float] = [0.05, 0.10, 0.25, 0.50, 1.0]
12 ):
13 self.deployment = deployment
14 self.min_samples = min_samples
15 self.max_regression_pct = max_regression_pct
16 self.promotion_stages = promotion_stages
17 self.current_stage = 0
18
19 def evaluate_and_act(self):
20 """Evaluate canary and decide next action."""
21 metrics = self.deployment.get_comparison_metrics()
22
23 if metrics.get('status') == 'insufficient_data':
24 print("Waiting for more canary data...")
25 return
26
27 # Check for regression
28 mean_diff_pct = (
29 metrics['difference']['mean_diff'] /
30 metrics['production']['mean_rul'] * 100
31 )
32
33 if mean_diff_pct > self.max_regression_pct:
34 # Canary is significantly worse
35 print(f"Canary regression detected: {mean_diff_pct:.1f}%")
36 self.deployment.rollback()
37 self._alert_rollback(metrics)
38 return
39
40 # Canary is acceptable, consider promotion
41 if self.current_stage < len(self.promotion_stages) - 1:
42 self.current_stage += 1
43 new_pct = self.promotion_stages[self.current_stage]
44 self.deployment.canary_percentage = new_pct
45 print(f"Promoting canary to {new_pct*100:.0f}% traffic")
46 else:
47 # Full promotion
48 self.deployment.promote_canary()
49 self._alert_promotion(metrics)
50
51 def _alert_rollback(self, metrics: Dict):
52 """Send rollback alert."""
53 # Notify team of rollback
54 pass
55
56 def _alert_promotion(self, metrics: Dict):
57 """Send promotion alert."""
58 # Notify team of successful promotion
59 passNever Skip Canary
Even for "minor" model updates, always use canary deployment. Subtle changes in training data or hyperparameters can cause unexpected behavior in production. The cost of a few hours of canary testing is negligible compared to the risk of degraded predictions.
Model Versioning
Proper versioning enables reproducibility, auditing, and quick rollbacks.
Versioning Schema
| Component | Format | Example |
|---|---|---|
| Model Version | MAJOR.MINOR.PATCH | 2.1.3 |
| Training Run ID | UUID | abc123-def456 |
| Data Version | Date-based | 2024-12-15 |
| Config Hash | SHA256 (first 8) | a1b2c3d4 |
1@dataclass
2class ModelVersion:
3 """
4 Complete model version metadata.
5 """
6 version: str # e.g., "2.1.3"
7 training_run_id: str # MLflow run ID
8 training_date: datetime
9 data_version: str # Dataset version used
10 config_hash: str # Hash of training config
11
12 # Performance metrics at training time
13 train_rmse: float
14 val_rmse: float
15 test_rmse: float
16 health_accuracy: float
17
18 # Deployment info
19 deployed_at: Optional[datetime] = None
20 deployment_status: str = "staging" # staging, canary, production, archived
21
22 # Lineage
23 parent_version: Optional[str] = None
24 changelog: str = ""
25
26 def to_dict(self) -> Dict:
27 return {
28 'version': self.version,
29 'training_run_id': self.training_run_id,
30 'training_date': self.training_date.isoformat(),
31 'metrics': {
32 'train_rmse': self.train_rmse,
33 'val_rmse': self.val_rmse,
34 'test_rmse': self.test_rmse,
35 'health_accuracy': self.health_accuracy
36 },
37 'deployment': {
38 'status': self.deployment_status,
39 'deployed_at': self.deployed_at.isoformat() if self.deployed_at else None
40 }
41 }
42
43class ModelRegistry:
44 """
45 Model version registry with rollback support.
46 """
47
48 def __init__(self, storage_path: str):
49 self.storage_path = storage_path
50 self.versions: Dict[str, ModelVersion] = {}
51 self.production_version: Optional[str] = None
52
53 def register(self, version: ModelVersion):
54 """Register a new model version."""
55 self.versions[version.version] = version
56 print(f"Registered model version {version.version}")
57
58 def promote_to_production(self, version_id: str):
59 """Promote a version to production."""
60 if version_id not in self.versions:
61 raise ValueError(f"Version {version_id} not found")
62
63 # Archive current production
64 if self.production_version:
65 self.versions[self.production_version].deployment_status = "archived"
66
67 # Promote new version
68 self.versions[version_id].deployment_status = "production"
69 self.versions[version_id].deployed_at = datetime.now()
70 self.production_version = version_id
71
72 print(f"Promoted {version_id} to production")
73
74 def rollback(self, target_version: Optional[str] = None):
75 """
76 Rollback to a previous version.
77
78 Args:
79 target_version: Specific version to rollback to.
80 If None, rollback to previous production.
81 """
82 if target_version is None:
83 # Find previous production version
84 target_version = self._find_previous_production()
85
86 if target_version is None:
87 raise ValueError("No previous version available for rollback")
88
89 self.promote_to_production(target_version)
90 print(f"Rolled back to version {target_version}")
91
92 def _find_previous_production(self) -> Optional[str]:
93 """Find the most recent archived version."""
94 archived = [
95 v for v in self.versions.values()
96 if v.deployment_status == "archived" and v.deployed_at
97 ]
98 if not archived:
99 return None
100 return max(archived, key=lambda v: v.deployed_at).version
101
102 def get_production_version(self) -> Optional[ModelVersion]:
103 """Get current production model version."""
104 if self.production_version:
105 return self.versions[self.production_version]
106 return NoneSummary
Model Updates and Retraining - Summary:
- Combine strategies: Use triggered retraining for drift + scheduled quarterly refresh
- Automated pipeline: Data prep → Training → Validation → Gates → Deployment
- Always canary: Never deploy directly to production, always test with traffic subset
- Version everything: Model, data, config, and metrics for full reproducibility
- Instant rollback: Maintain ability to revert to previous version in seconds
| Phase | Duration | Validation |
|---|---|---|
| Retraining | 1-2 hours | Training loss converged |
| Offline validation | 30 min | RMSE < threshold, Acc > 90% |
| Shadow mode | 24 hours | No anomalies in predictions |
| Canary (5%) | 24-48 hours | No regression vs production |
| Canary (25%) | 24 hours | Metrics stable |
| Full promotion | Instant | Automatic |
Key Insight: A production ML system is never "done"—it requires continuous maintenance as data distributions evolve and equipment changes. The key to sustainable operations is automation: automated drift detection triggers automated retraining, which flows through automated validation and deployment. The goal is a self-maintaining system that improves over time with minimal human intervention, while always maintaining the ability to quickly rollback if something goes wrong.
This concludes our guide to production deployment. With proper export, inference pipelines, monitoring, and update strategies, AMNL can provide reliable, accurate RUL predictions at scale—enabling proactive maintenance that prevents failures and saves costs.