Learning Objectives
By the end of this section, you will:
- Design a production inference pipeline for real-time RUL prediction
- Implement data ingestion from sensor streams
- Build a high-performance inference service with batching
- Handle edge cases like missing data and sensor failures
- Scale the pipeline for enterprise deployments
Core Insight: A production inference pipeline must handle continuous sensor streams, maintain low latency, and gracefully degrade under failure conditions. The architecture should separate data ingestion, preprocessing, inference, and result delivery into independent, scalable components.
Pipeline Architecture
A robust real-time inference pipeline consists of several interconnected components:
High-Level Architecture
1βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2β AMNL Inference Pipeline β
3βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
4β β
5β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β
6β β Sensors βββββΆβ IngestionβββββΆβ Preproc βββββΆβ Batcher β β
7β β (MQTT) β β Service β β Service β β β β
8β βββββββββββ βββββββββββ βββββββββββ ββββββ¬βββββ β
9β β β
10β βΌ β
11β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β
12β βDashboardββββββ Results ββββββInferenceββββββ Queue β β
13β β / API β β Service β β Service β β β β
14β βββββββββββ βββββββββββ βββββββββββ βββββββββββ β
15β β
16βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββComponent Responsibilities
| Component | Responsibility | Technology |
|---|---|---|
| Ingestion | Receive sensor data streams | MQTT, Kafka, gRPC |
| Preprocessing | Normalize, window, validate | Python, NumPy |
| Batcher | Collect samples for batch inference | Redis, In-memory |
| Inference | Run AMNL model predictions | ONNX Runtime, TensorRT |
| Results | Store and distribute predictions | PostgreSQL, Redis |
Data Ingestion
The ingestion layer receives real-time sensor data and prepares it for the inference pipeline.
MQTT Sensor Ingestion
1import paho.mqtt.client as mqtt
2import json
3import numpy as np
4from collections import defaultdict
5from threading import Lock
6import time
7
8class SensorIngestionService:
9 """
10 Ingest sensor data from MQTT and maintain sliding windows.
11 """
12
13 def __init__(self, broker_host, broker_port, window_size=50):
14 self.window_size = window_size
15 self.sensor_buffers = defaultdict(list)
16 self.buffer_lock = Lock()
17
18 # MQTT client setup
19 self.client = mqtt.Client()
20 self.client.on_connect = self._on_connect
21 self.client.on_message = self._on_message
22 self.client.connect(broker_host, broker_port, 60)
23
24 def _on_connect(self, client, userdata, flags, rc):
25 """Subscribe to sensor topics on connection."""
26 client.subscribe("sensors/+/readings")
27 print(f"Connected to MQTT broker, subscribed to sensor topics")
28
29 def _on_message(self, client, userdata, msg):
30 """Process incoming sensor message."""
31 try:
32 # Parse message: {"engine_id": "E001", "timestamp": ..., "sensors": [...]}
33 data = json.loads(msg.payload.decode())
34 engine_id = data['engine_id']
35 sensor_readings = data['sensors'] # 17 sensor values
36
37 with self.buffer_lock:
38 # Append to sliding window buffer
39 self.sensor_buffers[engine_id].append({
40 'timestamp': data['timestamp'],
41 'values': sensor_readings
42 })
43
44 # Maintain window size
45 if len(self.sensor_buffers[engine_id]) > self.window_size:
46 self.sensor_buffers[engine_id].pop(0)
47
48 except Exception as e:
49 print(f"Error processing message: {e}")
50
51 def get_inference_window(self, engine_id):
52 """
53 Get the current sliding window for an engine.
54
55 Returns:
56 numpy array of shape (window_size, 17) or None if insufficient data
57 """
58 with self.buffer_lock:
59 buffer = self.sensor_buffers.get(engine_id, [])
60
61 if len(buffer) < self.window_size:
62 return None # Not enough data yet
63
64 # Extract sensor values as numpy array
65 window = np.array([reading['values'] for reading in buffer])
66 return window.astype(np.float32)
67
68 def get_all_ready_engines(self):
69 """Get all engines with complete windows."""
70 ready = []
71 with self.buffer_lock:
72 for engine_id, buffer in self.sensor_buffers.items():
73 if len(buffer) >= self.window_size:
74 ready.append(engine_id)
75 return ready
76
77 def start(self):
78 """Start the MQTT client loop."""
79 self.client.loop_start()
80
81 def stop(self):
82 """Stop the MQTT client loop."""
83 self.client.loop_stop()Data Validation
1class DataValidator:
2 """Validate and clean sensor data before inference."""
3
4 def __init__(self, num_features=17):
5 self.num_features = num_features
6 # Expected ranges for each sensor (from training data statistics)
7 self.sensor_ranges = {
8 0: (0, 100), # Sensor 1 range
9 1: (500, 700), # Sensor 2 range
10 # ... define for all 17 sensors
11 }
12
13 def validate(self, window):
14 """
15 Validate a sensor window.
16
17 Returns:
18 Tuple of (is_valid, cleaned_window, issues)
19 """
20 issues = []
21
22 # Check shape
23 if window.shape != (50, self.num_features):
24 issues.append(f"Invalid shape: {window.shape}")
25 return False, None, issues
26
27 # Check for NaN/Inf
28 if np.isnan(window).any():
29 nan_count = np.isnan(window).sum()
30 issues.append(f"Contains {nan_count} NaN values")
31
32 # Attempt to fix: forward fill
33 window = self._forward_fill(window)
34
35 if np.isinf(window).any():
36 issues.append("Contains Inf values")
37 window = np.clip(window, -1e6, 1e6)
38
39 # Check for out-of-range values
40 for sensor_idx, (min_val, max_val) in self.sensor_ranges.items():
41 sensor_values = window[:, sensor_idx]
42 if (sensor_values < min_val * 0.5).any() or (sensor_values > max_val * 2).any():
43 issues.append(f"Sensor {sensor_idx} out of expected range")
44
45 is_valid = len(issues) == 0
46 return is_valid, window, issues
47
48 def _forward_fill(self, window):
49 """Forward fill NaN values."""
50 df = pd.DataFrame(window)
51 df = df.fillna(method='ffill').fillna(method='bfill')
52 return df.values.astype(np.float32)Missing Data Handling
Industrial sensors frequently experience intermittent failures. Forward-filling is a safe default for short gaps (<5 timesteps). For longer gaps, consider flagging the prediction as low-confidence.
Inference Service
The inference service runs AMNL predictions with dynamic batching for optimal throughput.
Batched Inference Service
1import asyncio
2from concurrent.futures import ThreadPoolExecutor
3import numpy as np
4from typing import Dict, List, Tuple
5import time
6
7class BatchedInferenceService:
8 """
9 High-performance inference service with dynamic batching.
10 """
11
12 def __init__(
13 self,
14 model_path: str,
15 max_batch_size: int = 256,
16 max_wait_ms: int = 10,
17 num_workers: int = 2
18 ):
19 self.max_batch_size = max_batch_size
20 self.max_wait_ms = max_wait_ms
21
22 # Load model (ONNX or TensorRT)
23 self.predictor = self._load_model(model_path)
24
25 # Request queue: (engine_id, window, future)
26 self.request_queue = asyncio.Queue()
27
28 # Thread pool for inference
29 self.executor = ThreadPoolExecutor(max_workers=num_workers)
30
31 # Start batch processing loop
32 self.running = True
33
34 def _load_model(self, model_path):
35 """Load ONNX or TensorRT model."""
36 if model_path.endswith('.onnx'):
37 import onnxruntime as ort
38 return ort.InferenceSession(model_path)
39 elif model_path.endswith('.engine'):
40 # Load TensorRT engine
41 return TensorRTPredictor(model_path)
42 else:
43 raise ValueError(f"Unknown model format: {model_path}")
44
45 async def predict(self, engine_id: str, window: np.ndarray) -> Dict:
46 """
47 Submit a prediction request and wait for result.
48
49 Args:
50 engine_id: Unique engine identifier
51 window: Sensor window of shape (50, 17)
52
53 Returns:
54 Dict with RUL prediction and health classification
55 """
56 future = asyncio.Future()
57 await self.request_queue.put((engine_id, window, future))
58 return await future
59
60 async def _batch_processor(self):
61 """
62 Continuously process batches from the queue.
63 """
64 while self.running:
65 batch = []
66 start_time = time.time()
67
68 # Collect requests until batch is full or timeout
69 while len(batch) < self.max_batch_size:
70 try:
71 elapsed_ms = (time.time() - start_time) * 1000
72 timeout = max(0.001, (self.max_wait_ms - elapsed_ms) / 1000)
73
74 request = await asyncio.wait_for(
75 self.request_queue.get(),
76 timeout=timeout
77 )
78 batch.append(request)
79 except asyncio.TimeoutError:
80 break # Timeout reached, process current batch
81
82 if not batch:
83 continue
84
85 # Run batch inference
86 await self._process_batch(batch)
87
88 async def _process_batch(self, batch: List[Tuple]):
89 """Process a batch of requests."""
90 engine_ids = [req[0] for req in batch]
91 windows = np.stack([req[1] for req in batch])
92 futures = [req[2] for req in batch]
93
94 # Run inference in thread pool
95 loop = asyncio.get_event_loop()
96 rul_preds, health_preds = await loop.run_in_executor(
97 self.executor,
98 self._run_inference,
99 windows
100 )
101
102 # Distribute results
103 for i, future in enumerate(futures):
104 result = {
105 'engine_id': engine_ids[i],
106 'rul_prediction': float(rul_preds[i, 0]),
107 'health_state': int(np.argmax(health_preds[i])),
108 'health_probabilities': health_preds[i].tolist(),
109 'timestamp': time.time()
110 }
111 future.set_result(result)
112
113 def _run_inference(self, windows: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
114 """Run model inference on a batch."""
115 if hasattr(self.predictor, 'run'):
116 # ONNX Runtime
117 outputs = self.predictor.run(None, {'sensor_data': windows})
118 return outputs[0], outputs[1]
119 else:
120 # TensorRT
121 return self.predictor.predict(windows)
122
123 async def start(self):
124 """Start the batch processor."""
125 asyncio.create_task(self._batch_processor())
126
127 def stop(self):
128 """Stop the service."""
129 self.running = FalseFastAPI Inference Endpoint
1from fastapi import FastAPI, HTTPException
2from pydantic import BaseModel
3from typing import List
4import numpy as np
5
6app = FastAPI(title="AMNL Inference API")
7
8# Initialize inference service
9inference_service = BatchedInferenceService("amnl_model.onnx")
10
11class PredictionRequest(BaseModel):
12 engine_id: str
13 sensor_readings: List[List[float]] # Shape: (50, 17)
14
15class PredictionResponse(BaseModel):
16 engine_id: str
17 rul_prediction: float
18 health_state: int
19 health_probabilities: List[float]
20 timestamp: float
21
22@app.on_event("startup")
23async def startup():
24 await inference_service.start()
25
26@app.post("/predict", response_model=PredictionResponse)
27async def predict(request: PredictionRequest):
28 """
29 Get RUL prediction for an engine.
30 """
31 # Validate input shape
32 window = np.array(request.sensor_readings, dtype=np.float32)
33 if window.shape != (50, 17):
34 raise HTTPException(
35 status_code=400,
36 detail=f"Expected shape (50, 17), got {window.shape}"
37 )
38
39 # Run prediction
40 result = await inference_service.predict(request.engine_id, window)
41
42 return PredictionResponse(**result)
43
44@app.get("/health")
45async def health_check():
46 """API health check endpoint."""
47 return {"status": "healthy", "model": "amnl_v1.0"}
48
49# Run with: uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4Dynamic Batching Benefits
Dynamic batching groups requests arriving within a short window (10ms) into a single batch, dramatically improving GPU utilization. A single request takes ~30ΞΌs, but 256 requests batched together only take ~8msβa 95% reduction in per-request latency under load.
Scaling Strategies
As the number of monitored engines grows, the pipeline must scale accordingly.
Horizontal Scaling Architecture
Kubernetes Deployment
1# kubernetes/amnl-deployment.yaml
2apiVersion: apps/v1
3kind: Deployment
4metadata:
5 name: amnl-inference
6spec:
7 replicas: 4 # Scale based on load
8 selector:
9 matchLabels:
10 app: amnl-inference
11 template:
12 metadata:
13 labels:
14 app: amnl-inference
15 spec:
16 containers:
17 - name: amnl-inference
18 image: amnl-inference:v1.0
19 ports:
20 - containerPort: 8000
21 resources:
22 limits:
23 nvidia.com/gpu: 1
24 memory: "4Gi"
25 cpu: "4"
26 requests:
27 nvidia.com/gpu: 1
28 memory: "2Gi"
29 cpu: "2"
30 env:
31 - name: MODEL_PATH
32 value: "/models/amnl_model.engine"
33 - name: MAX_BATCH_SIZE
34 value: "256"
35 volumeMounts:
36 - name: model-volume
37 mountPath: /models
38 volumes:
39 - name: model-volume
40 persistentVolumeClaim:
41 claimName: amnl-models-pvc
42---
43apiVersion: v1
44kind: Service
45metadata:
46 name: amnl-inference-service
47spec:
48 selector:
49 app: amnl-inference
50 ports:
51 - port: 80
52 targetPort: 8000
53 type: LoadBalancerScaling Recommendations
| Fleet Size | Update Frequency | GPUs Required | Architecture |
|---|---|---|---|
| 1,000 engines | 1 Hz | 1 GPU | Single node |
| 10,000 engines | 1 Hz | 1 GPU | Single node + replica |
| 10,000 engines | 10 Hz | 4 GPUs | Kubernetes cluster |
| 100,000 engines | 1 Hz | 4 GPUs | Kubernetes cluster |
| 100,000 engines | 10 Hz | 32 GPUs | Multi-region cluster |
Summary
Real-Time Inference Pipeline - Summary:
- Modular architecture: Separate ingestion, preprocessing, inference, and results
- Dynamic batching: Group requests for efficient GPU utilization
- Data validation: Handle missing data and sensor failures gracefully
- Async processing: Use async/await for high concurrency
- Kubernetes scaling: Horizontally scale with GPU-enabled pods
| Component | Key Metric | Target |
|---|---|---|
| End-to-end latency | Request to response | <100ms |
| Throughput | Predictions per second | 31K+ per GPU |
| Availability | Uptime | 99.9% |
| Data freshness | Max age of prediction | <2 seconds |
Key Insight: A production inference pipeline is more than just a modelβit's a complete system handling data ingestion, validation, batching, inference, and result delivery. The dynamic batching pattern is essential for achieving high throughput while maintaining low latency. With proper architecture, a single GPU can monitor tens of thousands of engines in real-time.