Chapter 20
18 min read
Section 98 of 104

Real-Time Inference Pipeline

Production Deployment

Learning Objectives

By the end of this section, you will:

  1. Design a production inference pipeline for real-time RUL prediction
  2. Implement data ingestion from sensor streams
  3. Build a high-performance inference service with batching
  4. Handle edge cases like missing data and sensor failures
  5. Scale the pipeline for enterprise deployments
Core Insight: A production inference pipeline must handle continuous sensor streams, maintain low latency, and gracefully degrade under failure conditions. The architecture should separate data ingestion, preprocessing, inference, and result delivery into independent, scalable components.

Pipeline Architecture

A robust real-time inference pipeline consists of several interconnected components:

High-Level Architecture

πŸ“text
1β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
2β”‚                    AMNL Inference Pipeline                       β”‚
3β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
4β”‚                                                                  β”‚
5β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
6β”‚  β”‚ Sensors │───▢│ Ingestion│───▢│ Preproc │───▢│ Batcher β”‚      β”‚
7β”‚  β”‚ (MQTT)  β”‚    β”‚ Service  β”‚    β”‚ Service β”‚    β”‚         β”‚      β”‚
8β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜      β”‚
9β”‚                                                      β”‚           β”‚
10β”‚                                                      β–Ό           β”‚
11β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
12β”‚  β”‚Dashboard│◀───│ Results │◀───│Inference│◀───│  Queue  β”‚      β”‚
13β”‚  β”‚  / API  β”‚    β”‚ Service β”‚    β”‚ Service β”‚    β”‚         β”‚      β”‚
14β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
15β”‚                                                                  β”‚
16β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Component Responsibilities

ComponentResponsibilityTechnology
IngestionReceive sensor data streamsMQTT, Kafka, gRPC
PreprocessingNormalize, window, validatePython, NumPy
BatcherCollect samples for batch inferenceRedis, In-memory
InferenceRun AMNL model predictionsONNX Runtime, TensorRT
ResultsStore and distribute predictionsPostgreSQL, Redis

Data Ingestion

The ingestion layer receives real-time sensor data and prepares it for the inference pipeline.

MQTT Sensor Ingestion

🐍python
1import paho.mqtt.client as mqtt
2import json
3import numpy as np
4from collections import defaultdict
5from threading import Lock
6import time
7
8class SensorIngestionService:
9    """
10    Ingest sensor data from MQTT and maintain sliding windows.
11    """
12
13    def __init__(self, broker_host, broker_port, window_size=50):
14        self.window_size = window_size
15        self.sensor_buffers = defaultdict(list)
16        self.buffer_lock = Lock()
17
18        # MQTT client setup
19        self.client = mqtt.Client()
20        self.client.on_connect = self._on_connect
21        self.client.on_message = self._on_message
22        self.client.connect(broker_host, broker_port, 60)
23
24    def _on_connect(self, client, userdata, flags, rc):
25        """Subscribe to sensor topics on connection."""
26        client.subscribe("sensors/+/readings")
27        print(f"Connected to MQTT broker, subscribed to sensor topics")
28
29    def _on_message(self, client, userdata, msg):
30        """Process incoming sensor message."""
31        try:
32            # Parse message: {"engine_id": "E001", "timestamp": ..., "sensors": [...]}
33            data = json.loads(msg.payload.decode())
34            engine_id = data['engine_id']
35            sensor_readings = data['sensors']  # 17 sensor values
36
37            with self.buffer_lock:
38                # Append to sliding window buffer
39                self.sensor_buffers[engine_id].append({
40                    'timestamp': data['timestamp'],
41                    'values': sensor_readings
42                })
43
44                # Maintain window size
45                if len(self.sensor_buffers[engine_id]) > self.window_size:
46                    self.sensor_buffers[engine_id].pop(0)
47
48        except Exception as e:
49            print(f"Error processing message: {e}")
50
51    def get_inference_window(self, engine_id):
52        """
53        Get the current sliding window for an engine.
54
55        Returns:
56            numpy array of shape (window_size, 17) or None if insufficient data
57        """
58        with self.buffer_lock:
59            buffer = self.sensor_buffers.get(engine_id, [])
60
61            if len(buffer) < self.window_size:
62                return None  # Not enough data yet
63
64            # Extract sensor values as numpy array
65            window = np.array([reading['values'] for reading in buffer])
66            return window.astype(np.float32)
67
68    def get_all_ready_engines(self):
69        """Get all engines with complete windows."""
70        ready = []
71        with self.buffer_lock:
72            for engine_id, buffer in self.sensor_buffers.items():
73                if len(buffer) >= self.window_size:
74                    ready.append(engine_id)
75        return ready
76
77    def start(self):
78        """Start the MQTT client loop."""
79        self.client.loop_start()
80
81    def stop(self):
82        """Stop the MQTT client loop."""
83        self.client.loop_stop()

Data Validation

🐍python
1class DataValidator:
2    """Validate and clean sensor data before inference."""
3
4    def __init__(self, num_features=17):
5        self.num_features = num_features
6        # Expected ranges for each sensor (from training data statistics)
7        self.sensor_ranges = {
8            0: (0, 100),    # Sensor 1 range
9            1: (500, 700),  # Sensor 2 range
10            # ... define for all 17 sensors
11        }
12
13    def validate(self, window):
14        """
15        Validate a sensor window.
16
17        Returns:
18            Tuple of (is_valid, cleaned_window, issues)
19        """
20        issues = []
21
22        # Check shape
23        if window.shape != (50, self.num_features):
24            issues.append(f"Invalid shape: {window.shape}")
25            return False, None, issues
26
27        # Check for NaN/Inf
28        if np.isnan(window).any():
29            nan_count = np.isnan(window).sum()
30            issues.append(f"Contains {nan_count} NaN values")
31
32            # Attempt to fix: forward fill
33            window = self._forward_fill(window)
34
35        if np.isinf(window).any():
36            issues.append("Contains Inf values")
37            window = np.clip(window, -1e6, 1e6)
38
39        # Check for out-of-range values
40        for sensor_idx, (min_val, max_val) in self.sensor_ranges.items():
41            sensor_values = window[:, sensor_idx]
42            if (sensor_values < min_val * 0.5).any() or (sensor_values > max_val * 2).any():
43                issues.append(f"Sensor {sensor_idx} out of expected range")
44
45        is_valid = len(issues) == 0
46        return is_valid, window, issues
47
48    def _forward_fill(self, window):
49        """Forward fill NaN values."""
50        df = pd.DataFrame(window)
51        df = df.fillna(method='ffill').fillna(method='bfill')
52        return df.values.astype(np.float32)

Missing Data Handling

Industrial sensors frequently experience intermittent failures. Forward-filling is a safe default for short gaps (<5 timesteps). For longer gaps, consider flagging the prediction as low-confidence.


Inference Service

The inference service runs AMNL predictions with dynamic batching for optimal throughput.

Batched Inference Service

🐍python
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import numpy as np
4from typing import Dict, List, Tuple
5import time
6
7class BatchedInferenceService:
8    """
9    High-performance inference service with dynamic batching.
10    """
11
12    def __init__(
13        self,
14        model_path: str,
15        max_batch_size: int = 256,
16        max_wait_ms: int = 10,
17        num_workers: int = 2
18    ):
19        self.max_batch_size = max_batch_size
20        self.max_wait_ms = max_wait_ms
21
22        # Load model (ONNX or TensorRT)
23        self.predictor = self._load_model(model_path)
24
25        # Request queue: (engine_id, window, future)
26        self.request_queue = asyncio.Queue()
27
28        # Thread pool for inference
29        self.executor = ThreadPoolExecutor(max_workers=num_workers)
30
31        # Start batch processing loop
32        self.running = True
33
34    def _load_model(self, model_path):
35        """Load ONNX or TensorRT model."""
36        if model_path.endswith('.onnx'):
37            import onnxruntime as ort
38            return ort.InferenceSession(model_path)
39        elif model_path.endswith('.engine'):
40            # Load TensorRT engine
41            return TensorRTPredictor(model_path)
42        else:
43            raise ValueError(f"Unknown model format: {model_path}")
44
45    async def predict(self, engine_id: str, window: np.ndarray) -> Dict:
46        """
47        Submit a prediction request and wait for result.
48
49        Args:
50            engine_id: Unique engine identifier
51            window: Sensor window of shape (50, 17)
52
53        Returns:
54            Dict with RUL prediction and health classification
55        """
56        future = asyncio.Future()
57        await self.request_queue.put((engine_id, window, future))
58        return await future
59
60    async def _batch_processor(self):
61        """
62        Continuously process batches from the queue.
63        """
64        while self.running:
65            batch = []
66            start_time = time.time()
67
68            # Collect requests until batch is full or timeout
69            while len(batch) < self.max_batch_size:
70                try:
71                    elapsed_ms = (time.time() - start_time) * 1000
72                    timeout = max(0.001, (self.max_wait_ms - elapsed_ms) / 1000)
73
74                    request = await asyncio.wait_for(
75                        self.request_queue.get(),
76                        timeout=timeout
77                    )
78                    batch.append(request)
79                except asyncio.TimeoutError:
80                    break  # Timeout reached, process current batch
81
82            if not batch:
83                continue
84
85            # Run batch inference
86            await self._process_batch(batch)
87
88    async def _process_batch(self, batch: List[Tuple]):
89        """Process a batch of requests."""
90        engine_ids = [req[0] for req in batch]
91        windows = np.stack([req[1] for req in batch])
92        futures = [req[2] for req in batch]
93
94        # Run inference in thread pool
95        loop = asyncio.get_event_loop()
96        rul_preds, health_preds = await loop.run_in_executor(
97            self.executor,
98            self._run_inference,
99            windows
100        )
101
102        # Distribute results
103        for i, future in enumerate(futures):
104            result = {
105                'engine_id': engine_ids[i],
106                'rul_prediction': float(rul_preds[i, 0]),
107                'health_state': int(np.argmax(health_preds[i])),
108                'health_probabilities': health_preds[i].tolist(),
109                'timestamp': time.time()
110            }
111            future.set_result(result)
112
113    def _run_inference(self, windows: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
114        """Run model inference on a batch."""
115        if hasattr(self.predictor, 'run'):
116            # ONNX Runtime
117            outputs = self.predictor.run(None, {'sensor_data': windows})
118            return outputs[0], outputs[1]
119        else:
120            # TensorRT
121            return self.predictor.predict(windows)
122
123    async def start(self):
124        """Start the batch processor."""
125        asyncio.create_task(self._batch_processor())
126
127    def stop(self):
128        """Stop the service."""
129        self.running = False

FastAPI Inference Endpoint

🐍python
1from fastapi import FastAPI, HTTPException
2from pydantic import BaseModel
3from typing import List
4import numpy as np
5
6app = FastAPI(title="AMNL Inference API")
7
8# Initialize inference service
9inference_service = BatchedInferenceService("amnl_model.onnx")
10
11class PredictionRequest(BaseModel):
12    engine_id: str
13    sensor_readings: List[List[float]]  # Shape: (50, 17)
14
15class PredictionResponse(BaseModel):
16    engine_id: str
17    rul_prediction: float
18    health_state: int
19    health_probabilities: List[float]
20    timestamp: float
21
22@app.on_event("startup")
23async def startup():
24    await inference_service.start()
25
26@app.post("/predict", response_model=PredictionResponse)
27async def predict(request: PredictionRequest):
28    """
29    Get RUL prediction for an engine.
30    """
31    # Validate input shape
32    window = np.array(request.sensor_readings, dtype=np.float32)
33    if window.shape != (50, 17):
34        raise HTTPException(
35            status_code=400,
36            detail=f"Expected shape (50, 17), got {window.shape}"
37        )
38
39    # Run prediction
40    result = await inference_service.predict(request.engine_id, window)
41
42    return PredictionResponse(**result)
43
44@app.get("/health")
45async def health_check():
46    """API health check endpoint."""
47    return {"status": "healthy", "model": "amnl_v1.0"}
48
49# Run with: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

Dynamic Batching Benefits

Dynamic batching groups requests arriving within a short window (10ms) into a single batch, dramatically improving GPU utilization. A single request takes ~30ΞΌs, but 256 requests batched together only take ~8msβ€”a 95% reduction in per-request latency under load.


Scaling Strategies

As the number of monitored engines grows, the pipeline must scale accordingly.

Horizontal Scaling Architecture

Kubernetes Deployment

πŸ“„yaml
1# kubernetes/amnl-deployment.yaml
2apiVersion: apps/v1
3kind: Deployment
4metadata:
5  name: amnl-inference
6spec:
7  replicas: 4  # Scale based on load
8  selector:
9    matchLabels:
10      app: amnl-inference
11  template:
12    metadata:
13      labels:
14        app: amnl-inference
15    spec:
16      containers:
17      - name: amnl-inference
18        image: amnl-inference:v1.0
19        ports:
20        - containerPort: 8000
21        resources:
22          limits:
23            nvidia.com/gpu: 1
24            memory: "4Gi"
25            cpu: "4"
26          requests:
27            nvidia.com/gpu: 1
28            memory: "2Gi"
29            cpu: "2"
30        env:
31        - name: MODEL_PATH
32          value: "/models/amnl_model.engine"
33        - name: MAX_BATCH_SIZE
34          value: "256"
35        volumeMounts:
36        - name: model-volume
37          mountPath: /models
38      volumes:
39      - name: model-volume
40        persistentVolumeClaim:
41          claimName: amnl-models-pvc
42---
43apiVersion: v1
44kind: Service
45metadata:
46  name: amnl-inference-service
47spec:
48  selector:
49    app: amnl-inference
50  ports:
51  - port: 80
52    targetPort: 8000
53  type: LoadBalancer

Scaling Recommendations

Fleet SizeUpdate FrequencyGPUs RequiredArchitecture
1,000 engines1 Hz1 GPUSingle node
10,000 engines1 Hz1 GPUSingle node + replica
10,000 engines10 Hz4 GPUsKubernetes cluster
100,000 engines1 Hz4 GPUsKubernetes cluster
100,000 engines10 Hz32 GPUsMulti-region cluster

Summary

Real-Time Inference Pipeline - Summary:

  1. Modular architecture: Separate ingestion, preprocessing, inference, and results
  2. Dynamic batching: Group requests for efficient GPU utilization
  3. Data validation: Handle missing data and sensor failures gracefully
  4. Async processing: Use async/await for high concurrency
  5. Kubernetes scaling: Horizontally scale with GPU-enabled pods
ComponentKey MetricTarget
End-to-end latencyRequest to response<100ms
ThroughputPredictions per second31K+ per GPU
AvailabilityUptime99.9%
Data freshnessMax age of prediction<2 seconds
Key Insight: A production inference pipeline is more than just a modelβ€”it's a complete system handling data ingestion, validation, batching, inference, and result delivery. The dynamic batching pattern is essential for achieving high throughput while maintaining low latency. With proper architecture, a single GPU can monitor tens of thousands of engines in real-time.