Chapter 20
12 min read
Section 100 of 104

Model Updates and Retraining

Production Deployment

Learning Objectives

By the end of this section, you will:

  1. Choose between retraining strategies (scheduled, triggered, continuous)
  2. Build an automated retraining pipeline with validation gates
  3. Deploy model updates safely using canary and blue-green strategies
  4. Implement model versioning with rollback capabilities
  5. Handle A/B testing for model improvements
Core Insight: Model maintenance is as important as initial development. A production ML system must continuously adapt to changing data distributions, new equipment, and evolving operating conditions. Safe deployment practices—canary releases, shadow mode, and automated rollback—protect against degraded model performance.

Model Update Strategies

Different update strategies balance freshness, cost, and risk differently.

Strategy Comparison

StrategyTriggerFrequencyBest For
ScheduledFixed calendarWeekly/MonthlyStable environments
TriggeredDrift detectionOn-demandDynamic environments
ContinuousNew data arrivalReal-timeHigh-value predictions
ManualBusiness decisionAd-hocMajor model changes

When to Retrain


Automated Retraining Pipeline

An automated pipeline ensures consistent, reproducible model updates with proper validation.

Pipeline Architecture

🐍python
1from dataclasses import dataclass
2from typing import Optional, Dict
3import mlflow
4from datetime import datetime
5import subprocess
6
7@dataclass
8class RetrainingConfig:
9    """Configuration for automated retraining."""
10    data_path: str
11    model_name: str
12    min_samples: int = 10000
13    validation_threshold_rmse: float = 12.0
14    validation_threshold_health_acc: float = 0.90
15    max_training_time_hours: float = 4.0
16
17class AutomatedRetrainingPipeline:
18    """
19    Automated model retraining with validation gates.
20    """
21
22    def __init__(self, config: RetrainingConfig):
23        self.config = config
24        self.mlflow_client = mlflow.tracking.MlflowClient()
25
26    def run_pipeline(self) -> Dict:
27        """
28        Execute the full retraining pipeline.
29
30        Returns:
31            Dict with pipeline status and model info
32        """
33        result = {
34            'started_at': datetime.now().isoformat(),
35            'status': 'started',
36            'stages': {}
37        }
38
39        try:
40            # Stage 1: Data preparation
41            result['stages']['data_prep'] = self._prepare_data()
42
43            # Stage 2: Training
44            result['stages']['training'] = self._train_model()
45
46            # Stage 3: Validation
47            result['stages']['validation'] = self._validate_model()
48
49            # Stage 4: Gate check
50            if not self._passes_gates(result['stages']['validation']):
51                result['status'] = 'rejected'
52                result['reason'] = 'Failed validation gates'
53                return result
54
55            # Stage 5: Model registration
56            result['stages']['registration'] = self._register_model()
57
58            # Stage 6: Export
59            result['stages']['export'] = self._export_model()
60
61            result['status'] = 'completed'
62            result['completed_at'] = datetime.now().isoformat()
63
64        except Exception as e:
65            result['status'] = 'failed'
66            result['error'] = str(e)
67
68        return result
69
70    def _prepare_data(self) -> Dict:
71        """Prepare training data from recent sensor readings."""
72        # Fetch recent data
73        # Split into train/val/test
74        # Apply preprocessing
75        return {
76            'train_samples': 45000,
77            'val_samples': 5000,
78            'test_samples': 5000,
79            'date_range': '2024-10-01 to 2024-12-31'
80        }
81
82    def _train_model(self) -> Dict:
83        """Train AMNL model with MLflow tracking."""
84        with mlflow.start_run(run_name=f"amnl_retrain_{datetime.now().strftime('%Y%m%d')}"):
85            # Log parameters
86            mlflow.log_params({
87                'learning_rate': 1e-3,
88                'batch_size': 256,
89                'epochs': 100,
90                'loss_weight_rul': 0.5,
91                'loss_weight_health': 0.5
92            })
93
94            # Training logic (simplified)
95            # model = train_amnl(...)
96
97            # Log metrics
98            mlflow.log_metrics({
99                'train_rmse': 8.5,
100                'val_rmse': 9.2,
101                'health_accuracy': 0.94
102            })
103
104            # Log model
105            # mlflow.pytorch.log_model(model, "model")
106
107            return {
108                'run_id': mlflow.active_run().info.run_id,
109                'train_rmse': 8.5,
110                'val_rmse': 9.2,
111                'training_time_hours': 1.5
112            }
113
114    def _validate_model(self) -> Dict:
115        """Validate model on held-out test set."""
116        # Run evaluation on test set
117        return {
118            'test_rmse': 9.4,
119            'test_health_accuracy': 0.93,
120            'per_dataset_rmse': {
121                'FD001': 10.2,
122                'FD002': 6.8,
123                'FD003': 9.1,
124                'FD004': 8.3
125            }
126        }
127
128    def _passes_gates(self, validation: Dict) -> bool:
129        """Check if model passes quality gates."""
130        rmse_ok = validation['test_rmse'] < self.config.validation_threshold_rmse
131        acc_ok = validation['test_health_accuracy'] > self.config.validation_threshold_health_acc
132
133        # Check against current production model
134        prod_rmse = self._get_production_rmse()
135        improvement_ok = validation['test_rmse'] <= prod_rmse * 1.05  # Allow 5% slack
136
137        return rmse_ok and acc_ok and improvement_ok
138
139    def _get_production_rmse(self) -> float:
140        """Get current production model's RMSE."""
141        # Fetch from model registry or monitoring
142        return 9.5
143
144    def _register_model(self) -> Dict:
145        """Register validated model in MLflow registry."""
146        # Register as new version
147        model_version = self.mlflow_client.create_model_version(
148            name=self.config.model_name,
149            source="runs:/run_id/model",
150            run_id="run_id"
151        )
152
153        return {
154            'version': model_version.version,
155            'stage': 'Staging'
156        }
157
158    def _export_model(self) -> Dict:
159        """Export model to deployment formats."""
160        # Export to ONNX and TensorRT
161        return {
162            'onnx_path': '/models/amnl_v2.1.onnx',
163            'tensorrt_path': '/models/amnl_v2.1.engine'
164        }

Pipeline Triggers

🐍python
1from apscheduler.schedulers.background import BackgroundScheduler
2from apscheduler.triggers.cron import CronTrigger
3
4class RetrainingScheduler:
5    """
6    Schedule and trigger model retraining.
7    """
8
9    def __init__(self, pipeline: AutomatedRetrainingPipeline):
10        self.pipeline = pipeline
11        self.scheduler = BackgroundScheduler()
12
13        # Scheduled retraining: Every Sunday at 2 AM
14        self.scheduler.add_job(
15            self._scheduled_retrain,
16            CronTrigger(day_of_week='sun', hour=2),
17            id='scheduled_retrain'
18        )
19
20    def trigger_on_drift(self, drift_score: float):
21        """Trigger retraining when drift is detected."""
22        if drift_score > 0.2:
23            print(f"Drift detected ({drift_score:.3f}), triggering retraining")
24            self.scheduler.add_job(
25                self.pipeline.run_pipeline,
26                'date',  # Run once immediately
27                id=f'drift_retrain_{datetime.now().timestamp()}'
28            )
29
30    def _scheduled_retrain(self):
31        """Run scheduled retraining."""
32        print("Running scheduled weekly retraining")
33        result = self.pipeline.run_pipeline()
34        self._notify_result(result)
35
36    def _notify_result(self, result: Dict):
37        """Send notification about retraining result."""
38        # Send to Slack, email, etc.
39        pass
40
41    def start(self):
42        """Start the scheduler."""
43        self.scheduler.start()

Safe Model Deployment

Deploying a new model version requires careful strategies to minimize risk.

Deployment Strategies

StrategyDescriptionRisk LevelRollback Speed
Shadow ModeNew model runs in parallel, no impactZeroInstant
CanaryRoute 5-10% of traffic to new modelLowSeconds
Blue-GreenSwitch all traffic atomicallyMediumSeconds
RollingGradually replace instancesMediumMinutes

Canary Deployment Implementation

🐍python
1import random
2from typing import Tuple
3
4class CanaryDeployment:
5    """
6    Canary deployment for safe model updates.
7    """
8
9    def __init__(
10        self,
11        production_model,
12        canary_model,
13        canary_percentage: float = 0.05
14    ):
15        self.production_model = production_model
16        self.canary_model = canary_model
17        self.canary_percentage = canary_percentage
18
19        # Metrics tracking
20        self.production_predictions = []
21        self.canary_predictions = []
22
23    def predict(self, engine_id: str, window) -> Tuple[float, int, str]:
24        """
25        Route prediction to production or canary model.
26
27        Returns:
28            Tuple of (rul, health, model_version)
29        """
30        # Deterministic routing based on engine_id for consistency
31        use_canary = hash(engine_id) % 100 < (self.canary_percentage * 100)
32
33        if use_canary:
34            rul, health = self.canary_model.predict(window)
35            self.canary_predictions.append({'rul': rul, 'health': health})
36            return rul, health, 'canary'
37        else:
38            rul, health = self.production_model.predict(window)
39            self.production_predictions.append({'rul': rul, 'health': health})
40            return rul, health, 'production'
41
42    def get_comparison_metrics(self) -> Dict:
43        """Compare production vs canary performance."""
44        if len(self.canary_predictions) < 100:
45            return {'status': 'insufficient_data'}
46
47        prod_ruls = [p['rul'] for p in self.production_predictions]
48        canary_ruls = [p['rul'] for p in self.canary_predictions]
49
50        return {
51            'production': {
52                'mean_rul': np.mean(prod_ruls),
53                'std_rul': np.std(prod_ruls),
54                'count': len(prod_ruls)
55            },
56            'canary': {
57                'mean_rul': np.mean(canary_ruls),
58                'std_rul': np.std(canary_ruls),
59                'count': len(canary_ruls)
60            },
61            'difference': {
62                'mean_diff': np.mean(canary_ruls) - np.mean(prod_ruls),
63                'std_diff': np.std(canary_ruls) - np.std(prod_ruls)
64            }
65        }
66
67    def promote_canary(self):
68        """Promote canary to production."""
69        self.production_model = self.canary_model
70        self.canary_model = None
71        self.canary_percentage = 0.0
72        print("Canary promoted to production")
73
74    def rollback(self):
75        """Rollback canary, keep production."""
76        self.canary_model = None
77        self.canary_percentage = 0.0
78        print("Canary rolled back")

Automated Promotion/Rollback

🐍python
1class CanaryController:
2    """
3    Automated canary promotion and rollback based on metrics.
4    """
5
6    def __init__(
7        self,
8        deployment: CanaryDeployment,
9        min_samples: int = 1000,
10        max_regression_pct: float = 5.0,
11        promotion_stages: List[float] = [0.05, 0.10, 0.25, 0.50, 1.0]
12    ):
13        self.deployment = deployment
14        self.min_samples = min_samples
15        self.max_regression_pct = max_regression_pct
16        self.promotion_stages = promotion_stages
17        self.current_stage = 0
18
19    def evaluate_and_act(self):
20        """Evaluate canary and decide next action."""
21        metrics = self.deployment.get_comparison_metrics()
22
23        if metrics.get('status') == 'insufficient_data':
24            print("Waiting for more canary data...")
25            return
26
27        # Check for regression
28        mean_diff_pct = (
29            metrics['difference']['mean_diff'] /
30            metrics['production']['mean_rul'] * 100
31        )
32
33        if mean_diff_pct > self.max_regression_pct:
34            # Canary is significantly worse
35            print(f"Canary regression detected: {mean_diff_pct:.1f}%")
36            self.deployment.rollback()
37            self._alert_rollback(metrics)
38            return
39
40        # Canary is acceptable, consider promotion
41        if self.current_stage < len(self.promotion_stages) - 1:
42            self.current_stage += 1
43            new_pct = self.promotion_stages[self.current_stage]
44            self.deployment.canary_percentage = new_pct
45            print(f"Promoting canary to {new_pct*100:.0f}% traffic")
46        else:
47            # Full promotion
48            self.deployment.promote_canary()
49            self._alert_promotion(metrics)
50
51    def _alert_rollback(self, metrics: Dict):
52        """Send rollback alert."""
53        # Notify team of rollback
54        pass
55
56    def _alert_promotion(self, metrics: Dict):
57        """Send promotion alert."""
58        # Notify team of successful promotion
59        pass

Never Skip Canary

Even for "minor" model updates, always use canary deployment. Subtle changes in training data or hyperparameters can cause unexpected behavior in production. The cost of a few hours of canary testing is negligible compared to the risk of degraded predictions.


Model Versioning

Proper versioning enables reproducibility, auditing, and quick rollbacks.

Versioning Schema

ComponentFormatExample
Model VersionMAJOR.MINOR.PATCH2.1.3
Training Run IDUUIDabc123-def456
Data VersionDate-based2024-12-15
Config HashSHA256 (first 8)a1b2c3d4
🐍python
1@dataclass
2class ModelVersion:
3    """
4    Complete model version metadata.
5    """
6    version: str                    # e.g., "2.1.3"
7    training_run_id: str            # MLflow run ID
8    training_date: datetime
9    data_version: str               # Dataset version used
10    config_hash: str                # Hash of training config
11
12    # Performance metrics at training time
13    train_rmse: float
14    val_rmse: float
15    test_rmse: float
16    health_accuracy: float
17
18    # Deployment info
19    deployed_at: Optional[datetime] = None
20    deployment_status: str = "staging"  # staging, canary, production, archived
21
22    # Lineage
23    parent_version: Optional[str] = None
24    changelog: str = ""
25
26    def to_dict(self) -> Dict:
27        return {
28            'version': self.version,
29            'training_run_id': self.training_run_id,
30            'training_date': self.training_date.isoformat(),
31            'metrics': {
32                'train_rmse': self.train_rmse,
33                'val_rmse': self.val_rmse,
34                'test_rmse': self.test_rmse,
35                'health_accuracy': self.health_accuracy
36            },
37            'deployment': {
38                'status': self.deployment_status,
39                'deployed_at': self.deployed_at.isoformat() if self.deployed_at else None
40            }
41        }
42
43class ModelRegistry:
44    """
45    Model version registry with rollback support.
46    """
47
48    def __init__(self, storage_path: str):
49        self.storage_path = storage_path
50        self.versions: Dict[str, ModelVersion] = {}
51        self.production_version: Optional[str] = None
52
53    def register(self, version: ModelVersion):
54        """Register a new model version."""
55        self.versions[version.version] = version
56        print(f"Registered model version {version.version}")
57
58    def promote_to_production(self, version_id: str):
59        """Promote a version to production."""
60        if version_id not in self.versions:
61            raise ValueError(f"Version {version_id} not found")
62
63        # Archive current production
64        if self.production_version:
65            self.versions[self.production_version].deployment_status = "archived"
66
67        # Promote new version
68        self.versions[version_id].deployment_status = "production"
69        self.versions[version_id].deployed_at = datetime.now()
70        self.production_version = version_id
71
72        print(f"Promoted {version_id} to production")
73
74    def rollback(self, target_version: Optional[str] = None):
75        """
76        Rollback to a previous version.
77
78        Args:
79            target_version: Specific version to rollback to.
80                          If None, rollback to previous production.
81        """
82        if target_version is None:
83            # Find previous production version
84            target_version = self._find_previous_production()
85
86        if target_version is None:
87            raise ValueError("No previous version available for rollback")
88
89        self.promote_to_production(target_version)
90        print(f"Rolled back to version {target_version}")
91
92    def _find_previous_production(self) -> Optional[str]:
93        """Find the most recent archived version."""
94        archived = [
95            v for v in self.versions.values()
96            if v.deployment_status == "archived" and v.deployed_at
97        ]
98        if not archived:
99            return None
100        return max(archived, key=lambda v: v.deployed_at).version
101
102    def get_production_version(self) -> Optional[ModelVersion]:
103        """Get current production model version."""
104        if self.production_version:
105            return self.versions[self.production_version]
106        return None

Summary

Model Updates and Retraining - Summary:

  1. Combine strategies: Use triggered retraining for drift + scheduled quarterly refresh
  2. Automated pipeline: Data prep → Training → Validation → Gates → Deployment
  3. Always canary: Never deploy directly to production, always test with traffic subset
  4. Version everything: Model, data, config, and metrics for full reproducibility
  5. Instant rollback: Maintain ability to revert to previous version in seconds
PhaseDurationValidation
Retraining1-2 hoursTraining loss converged
Offline validation30 minRMSE < threshold, Acc > 90%
Shadow mode24 hoursNo anomalies in predictions
Canary (5%)24-48 hoursNo regression vs production
Canary (25%)24 hoursMetrics stable
Full promotionInstantAutomatic
Key Insight: A production ML system is never "done"—it requires continuous maintenance as data distributions evolve and equipment changes. The key to sustainable operations is automation: automated drift detection triggers automated retraining, which flows through automated validation and deployment. The goal is a self-maintaining system that improves over time with minimal human intervention, while always maintaining the ability to quickly rollback if something goes wrong.

This concludes our guide to production deployment. With proper export, inference pipelines, monitoring, and update strategies, AMNL can provide reliable, accurate RUL predictions at scale—enabling proactive maintenance that prevents failures and saves costs.