We have built every component of the data pipeline in isolation: feature selection (§5.3), condition discovery (§5.4 / §6.2), per-condition normalisation (§6.3, §6.4), sliding-window construction (§7.1), RUL capping (§7.2), health-state discretisation (§7.3). This section unifies them into one production-grade CMAPSSFullDataset that the rest of the book consumes verbatim.
Once you have this Dataset, every training script becomes a four-input training loop. Forward (X, cond, ...); backward through (rul_loss + health_loss). Chapters 15, 20, 22 all reduce to this skeleton.
The Contract: One Sample, Three Outputs
Each call to ds[i] returns a 4-tuple (Xnorm,c,yrul,yhs):
Field
Shape
dtype
Source
Xnorm
(W, 14)
float32
§5.3 select + §6.3 normalise
c
(W,)
int64
§5.4 / §6.2 cluster IDs per cycle
yrul
()
float32
§7.2 capped at R_max
yhs
()
int64
§7.3 health-state class
Python: The Full Pipeline as One Function
One function reads train, fits everything, persists the bundle
🐍fit_full_pipeline.py
Explanation(32)
Code(42)
1import numpy as np, pandas as pd, joblib
Triple-import on one line. NumPy gives us fast N-dimensional arrays + the math kernels (np.minimum, np.stack). Pandas gives us the labelled DataFrame for engine/cycle/sensor tabular data. Joblib persists arbitrary Python objects (sklearn estimators, NumPy arrays) to a single .joblib file we can reload at test time.
EXECUTION STATE
numpy as np = Numerical computing — ndarray, broadcasting, vectorised math. Used here for np.minimum (RUL cap) and np.stack (per-condition matrices).
pandas as pd = Tabular data with row/column labels. Used here to read the C-MAPSS .txt file and groupby('engine_id') for per-engine cycle counting.
joblib = Serialises Python objects (especially NumPy + sklearn) to disk. Faster than pickle for arrays. Used to dump the bundle so test code only LOADS, never re-fits.
2from sklearn.cluster import KMeans
Imports sklearn's K-Means clusterer. We will fit it on the 3-D operational-setting space (op_set_1, op_set_2, op_set_3) to discover the n_conditions=6 flight regimes that the C-MAPSS FD002 / FD004 datasets contain.
EXECUTION STATE
📚 KMeans = Unsupervised algorithm that partitions points into k clusters by minimising within-cluster variance. fit() learns centroids; .labels_ gives cluster IDs; .predict() assigns new points.
0-based indices of the 14 sensors that survived feature selection in §5.3. The 21 raw C-MAPSS sensors include 7 that are flat lines (zero variance) — those were dropped. The remaining 14 carry the degradation signal.
→ why 0-based = These are list indices into the 21-sensor list, not sensor numbers. Index 0 → sensor_1 dropped, index 1 → sensor_2 kept, etc. Line 19 converts these to sensor_2..sensor_21 column names.
5R_MAX = 125
Maximum useful-life cap (cycles). From §7.2: capping RUL at 125 stops the loss from over-weighting healthy engines (where true RUL > 125 is just 'lots'). Used by the Dataset.__getitem__ to compute y_rul.
EXECUTION STATE
R_MAX = 125 = Cycles. Any raw RUL > 125 → clamped to 125. Empirically chosen so the regression target spans a 'meaningful' degradation regime instead of unbounded healthy values.
6THR_DEGR, THR_CRIT = 80, 30
Health-state thresholds (§7.3). Tuple-unpacking assigns 80 → THR_DEGR (degrading) and 30 → THR_CRIT (critical). The Dataset converts capped RUL into a 3-class label: normal (RUL > 80) / degrading (30 < RUL ≤ 80) / critical (RUL ≤ 30).
EXECUTION STATE
THR_DEGR = 80 = Cycles. If RUL_capped ≤ 80, the engine enters DEGRADING state (class 1).
THR_CRIT = 30 = Cycles. If RUL_capped ≤ 30, the engine enters CRITICAL state (class 2). Overrides DEGRADING because the check happens after.
Single function that performs ALL train-time fitting: load CSV, compute RUL, fit k-means on op_settings, compute per-condition sensor means/stds. Returns one bundle that test code can re-load.
EXECUTION STATE
⬇ input: train_path = Path to a C-MAPSS train file, e.g. 'data/raw/train_FD002.txt'. Whitespace-separated, 26 columns, no header.
⬇ input: n_conditions = 6 = Number of operating-condition clusters. FD002 / FD004 → 6 (multi-condition). FD001 / FD003 → 1 (single-condition, k-means is skipped).
⬇ input: seed = 0 = RNG seed for KMeans's random init. Same seed → reproducible cluster assignments across runs (critical for reproducible per-condition stats).
⬆ returns = dict with 5 keys: km (fitted KMeans or None), means (n_cond × 14), stds (n_cond × 14), sensor_cols (list of 14 names), n_conditions (int).
10Docstring (first line)
First line of the docstring. Communicates the function's contract: it rolls every train-time fit into one bundle that downstream code can load and apply.
11Docstring (second line)
Closing line of the docstring. Reinforces that the OUTPUT is exactly what test inference needs — no recomputation at test time.
12cols = (["engine_id", "cycle"]
Start building the 26-element column-name list. C-MAPSS files have no header row, so we must supply names ourselves. The schema is fixed: engine_id, cycle, then 3 op_settings, then 21 sensors.
EXECUTION STATE
["engine_id", "cycle"] = First two columns: engine_id (which engine) and cycle (timestep number within that engine).
13+ [f"op_set_{i}" for i in range(1, 4)]
List comprehension generating the 3 operational-setting column names. range(1, 4) yields 1, 2, 3 → produces ['op_set_1', 'op_set_2', 'op_set_3']. The + concatenates this list with the previous one.
EXECUTION STATE
📚 range(start, stop) = Built-in: yields integers from start (inclusive) to stop (exclusive). range(1, 4) → 1, 2, 3.
f"op_set_{i}" = f-string formatting: substitutes the loop variable into the literal. i=1 → 'op_set_1'.
→ result fragment = ['op_set_1', 'op_set_2', 'op_set_3']
14+ [f"sensor_{i}" for i in range(1, 22)])
Generates 21 sensor column names: sensor_1 .. sensor_21. range(1, 22) yields 1..21 (stop is exclusive). The closing parenthesis ends the multi-line tuple/list expression started on line 12.
Loads the whitespace-separated text file into a DataFrame. C-MAPSS files have NO header row and use variable-width whitespace (not tabs, not single spaces) — both quirks must be told to read_csv.
EXECUTION STATE
📚 pd.read_csv() = Pandas function: parses CSV/TSV/whitespace-separated text into a DataFrame. Handles type inference, missing values, quoting, and large files via chunking.
⬇ arg 1: train_path = File path. Pandas opens the file and streams it.
⬇ arg 2: sep=r"\s+" = Regex 'one or more whitespace chars'. Needed because C-MAPSS uses variable spaces, not a fixed delimiter. The r prefix makes it a raw string so backslashes aren't double-escaped.
⬇ arg 3: header=None = Tells pandas there is NO header row in the file — every row is data. Without this, pandas would treat row 0 as column names.
⬇ arg 4: names=cols = Provide column names explicitly. Pandas applies these to the 26 columns it parsed.
Computes raw remaining-useful-life per row: max_cycle_for_this_engine − current_cycle. groupby + transform broadcasts the per-engine max back to every row of that engine, so subtraction is element-wise.
EXECUTION STATE
📚 df.groupby(col) = Splits the DataFrame into groups by unique values of col. Returns a GroupBy object — lazy until you call an aggregation. Here we group by engine_id so each engine's cycles stay together.
📚 .transform("max") = Applies aggregation per group then broadcasts back to ORIGINAL row count. Unlike .max() which collapses to one row per group, transform keeps the same length so we can subtract row-wise.
→ example: engine 1 with 192 cycles = df.groupby('engine_id')['cycle'].transform('max') puts 192 in all 192 rows for engine 1. Then 192 - cycle gives RUL: 191 (cycle 1), 190 (cycle 2), ..., 0 (cycle 192).
⬆ result: df["RUL_raw"] = New column. Values like 191, 190, ..., 0 for engine 1; 286, 285, ..., 0 for engine 2 (engine 2 has 287 cycles).
Caps RUL at R_MAX = 125. Every raw RUL above 125 is clamped to 125; values ≤ 125 pass through unchanged. This is the §7.2 cap — used downstream by Dataset.__getitem__ to produce y_rul.
EXECUTION STATE
📚 np.minimum(a, b) = Element-wise min. For arrays a and b of equal/broadcastable shape, returns an array where out[i] = min(a[i], b[i]). Different from np.min (which reduces along an axis).
⬇ arg 1: df["RUL_raw"] = Series of raw RUL values, e.g. [191, 190, ..., 0, 286, 285, ..., 0]
⬇ arg 2: R_MAX = 125 = Scalar broadcast against the Series — every element compared against 125.
⬆ result: df["RUL_capped"] = [125, 125, ..., 125, 124, 123, ..., 0] — flat 125 plateau for healthy region, then linear ramp to 0 in last 125 cycles.
19sensor_cols = [f"sensor_{i+1}" for i in INFORMATIVE]
Converts the 0-based INFORMATIVE indices into the 14 actual column names ('sensor_2', 'sensor_3', ...) that we want to keep. The +1 maps INFORMATIVE entry 1 → sensor_2.
EXECUTION STATE
for i in INFORMATIVE = Loops i over [1, 2, 3, 6, 7, 8, 10, 11, 12, 13, 14, 16, 19, 20].
Branch: if more than one operating regime exists, fit k-means; otherwise (FD001/FD003) skip clustering and tag every row with cond=0. Mirrors the test-time branch in CMAPSSFullDataset.__init__.
EXECUTION STATE
n_conditions = Function parameter. 6 for FD002 → enters this branch.
Constructs a K-Means estimator with n_conditions=6 clusters, then fits it on the 3-D operational-setting matrix. .fit() runs Lloyd's algorithm to find the 6 centroids that minimise within-cluster variance.
⬇ arg 1: n_clusters = 6 = How many clusters/regimes to find. Domain knowledge: C-MAPSS FD002 has 6 distinct flight conditions (combinations of altitude × Mach × throttle).
⬇ arg 2: n_init = 10 = Number of independent k-means runs with different random initial centroids. Final result = best of 10 (lowest inertia). Avoids being stuck in poor local minima.
⬇ arg 3: random_state = seed = Seeds the RNG for centroid initialisation. seed=0 → same starting state every run → identical cluster assignments → identical means/stds bundle.
📚 .fit(X) = Lloyd's algorithm: alternates assigning each point to the nearest centroid and recomputing centroids as cluster means until convergence.
⬇ fit input: df[["op_set_1", "op_set_2", "op_set_3"]] = DataFrame slice → ~53,759 × 3 matrix of (altitude, Mach, throttle) for every cycle of every engine.
⬆ result: km = Fitted KMeans. km.cluster_centers_ shape (6, 3); km.labels_ shape (53759,) with values in {0, 1, 2, 3, 4, 5}.
23df["cond"] = km.labels_
Tags every row with its cluster ID. km.labels_ is a NumPy array of length len(df), aligned to the original row order, so direct column assignment works without re-indexing.
EXECUTION STATE
km.labels_ = 1-D NumPy array, dtype int32, shape (53759,) for FD002. Each element ∈ {0, 1, 2, 3, 4, 5}.
⬆ result: df["cond"] = New column tagging each cycle with its operating regime. e.g. [3, 3, 5, 5, 1, 1, 0, 2, ...] — each engine cycles through different conditions during a flight.
24else:
Single-condition branch. Reached when n_conditions = 1 (FD001 / FD003 — fixed sea-level cruise). No clustering needed.
25km = None
Set km to None in the bundle. Test code checks `if self.km is None:` to mirror this branch.
EXECUTION STATE
km = None = Sentinel value. Marks 'no clusterer fitted' so downstream code can branch correctly.
26df["cond"] = 0
Assign the scalar 0 to a new column — pandas broadcasts it to every row. Means/stds will then have shape (1, 14): one row of stats for the single regime.
EXECUTION STATE
broadcasting scalar to column = df['cond'] = 0 creates a column of length len(df) filled with 0. Equivalent to np.zeros(len(df), dtype=int).
28means = np.stack([... for c in range(n_conditions)])
Build the per-condition mean matrix. For each condition c ∈ {0, ..., 5}, filter rows where cond == c, take the mean of the 14 sensor columns → 1-D array of length 14. Stack the 6 arrays vertically into a (6, 14) matrix.
EXECUTION STATE
📚 np.stack(arrays, axis=0) = Stacks a sequence of arrays along a NEW axis. Default axis=0 makes them rows of a new array. Different from np.concatenate which joins along an EXISTING axis.
⬇ inner: df.loc[df["cond"] == c, sensor_cols] = Boolean filter + column select. Returns rows where cond == c and only the 14 sensor columns. e.g. for c=0 maybe 8,800 rows × 14 cols.
📚 .mean() = Pandas DataFrame method: column-wise mean by default. Returns a Series of length 14 (one mean per sensor).
📚 .to_numpy() = Converts the Series to a 1-D NumPy array. Drops the index. Required because np.stack expects ndarrays.
29for c in range(n_conditions)
Continuation of the means comprehension on line 28. Loops c through 0, 1, 2, 3, 4, 5 (n_conditions=6) — one iteration per regime.
LOOP TRACE · 6 iterations
c = 0
result = row 0 of means: 14-element vector of mean sensor values for regime 0
c = 1
result = row 1 of means: regime 1 means (different altitude/Mach combination)
c = 2
result = row 2 of means
c = 3
result = row 3 of means
c = 4
result = row 4 of means
c = 5
result = row 5 of means — last iteration; np.stack then assembles all 6 rows
⬆ means.shape = (6, 14) — 6 regimes × 14 sensors
30stds = np.stack([... for c in range(n_conditions)])
Same pattern as means, but with .std() (sample standard deviation). Used at test time as the σ in the per-condition Z-score: x_norm = (x − μ_cond) / (σ_cond + 1e-8).
EXECUTION STATE
📚 .std() = Pandas DataFrame method: column-wise sample std (ddof=1 by default — divides by N-1). Returns a Series of length 14.
→ why ddof=1? = Pandas uses unbiased estimator (sample std). NumPy default is population std (ddof=0). Slight difference for large N, irrelevant for this dataset.
⬆ stds.shape = (6, 14) — same shape as means, indexed identically by cluster ID.
31for c in range(n_conditions)
Continuation of the stds comprehension. Same 0..5 loop as means, producing one std vector per regime.
Pack everything the test pipeline needs into a single dict. The bundle contains the FITTED estimator (km), the per-condition statistics (means, stds), the ordered column-name list (sensor_cols) and the cluster count (n_conditions).
→ why a dict? = Single object — joblib.dump can serialise it in one shot. Test code only needs to call joblib.load(bundle_path) to get everything back.
36# Save once
Comment marking the start of the script-level work. Train fits ONCE and persists; test never refits.
Serialises the bundle to disk in joblib format. One file ↔ one bundle. Test code uses joblib.load(...) to round-trip back to the same dict.
EXECUTION STATE
📚 joblib.dump(value, filename) = Pickles `value` and writes it to `filename`. Optimised for NumPy arrays — uses memory-mapped storage when arrays are large.
⬇ arg 1: bundle = The dict to persist.
⬇ arg 2: "fd002_full_pipeline.joblib" = Output path. Convention: include the dataset name + 'pipeline' so multiple bundles don't collide.
→ side effect = Writes a binary file ~few hundred KB. Reproducibility guarantee: anyone with this file gets the EXACT same means/stds/cluster centroids.
40print("means.shape :", bundle["means"].shape)
Prints the means array's shape. Tuple printed as (rows, cols).
EXECUTION STATE
bundle["means"].shape = (6, 14) — accessed via dict key, then ndarray .shape attribute.
⬆ stdout = means.shape : (6, 14)
41print("stds.shape :", bundle["stds"].shape)
Same shape check for stds. Should match means exactly — they were both built from the same per-condition row filters.
Slice [:5] takes the first 5 names; '...' is a literal string (not Python's Ellipsis) for visual brevity. Verifies the column-name list survived serialisation.
EXECUTION STATE
bundle["sensor_cols"][:5] = List slice — first 5 elements of the 14-element list.
1import numpy as np, pandas as pd, joblib
2from sklearn.cluster import KMeans
34INFORMATIVE =[1,2,3,6,7,8,10,11,12,13,14,16,19,20]# §5.35R_MAX =125# §7.26THR_DEGR, THR_CRIT =80,30# §7.3789deffit_full_pipeline(train_path:str, n_conditions:int=6, seed:int=0):10"""One function. Reads train; fits k-means; computes per-cond stats;
11 selects 14 sensors; returns the bundle that test inference needs."""12 cols =(["engine_id","cycle"]13+[f"op_set_{i}"for i inrange(1,4)]14+[f"sensor_{i}"for i inrange(1,22)])15 df = pd.read_csv(train_path, sep=r"\s+", header=None, names=cols)16 df["RUL_raw"]= df.groupby("engine_id")["cycle"].transform("max")- df["cycle"]17 df["RUL_capped"]= np.minimum(df["RUL_raw"], R_MAX)1819 sensor_cols =[f"sensor_{i+1}"for i in INFORMATIVE]# 14 sensors2021if n_conditions >1:22 km = KMeans(n_clusters=n_conditions, n_init=10, random_state=seed).fit(df[["op_set_1","op_set_2","op_set_3"]])23 df["cond"]= km.labels_
24else:25 km =None26 df["cond"]=02728 means = np.stack([df.loc[df["cond"]== c, sensor_cols].mean().to_numpy()29for c inrange(n_conditions)])30 stds = np.stack([df.loc[df["cond"]== c, sensor_cols].std().to_numpy()31for c inrange(n_conditions)])3233return{"km": km,"means": means,"stds": stds,"sensor_cols": sensor_cols,"n_conditions": n_conditions}343536# Save once37bundle = fit_full_pipeline("data/raw/train_FD002.txt", n_conditions=6)38joblib.dump(bundle,"fd002_full_pipeline.joblib")3940print("means.shape :", bundle["means"].shape)# (6, 14)41print("stds.shape :", bundle["stds"].shape)# (6, 14)42print("sensor_cols :", bundle["sensor_cols"][:5],"...")# first 5 names
PyTorch: The Production CMAPSSFullDataset
The unifying class. __getitem__ pre-computes normalised inputs at index time so training-loop overhead is minimal.
Production Dataset - 4-tuple per sample, all transforms applied
🐍cmapss_full_dataset.py
Explanation(67)
Code(82)
1import joblib, numpy as np, pandas as pd, torch
Four imports on one line. joblib loads the bundle that fit_full_pipeline saved. numpy + pandas reproduce the train-time loading logic. torch provides the Tensor type and the Dataset base class.
EXECUTION STATE
joblib = Same library used to dump the bundle. joblib.load(path) deserialises it back into a dict.
numpy as np = Used here for np.minimum (RUL cap) and np.zeros (state array).
pandas as pd = Reads the CSV, exposes groupby for sliding-window iteration over engines.
torch = PyTorch core. Used for torch.from_numpy (zero-copy ndarray → Tensor) and torch.tensor (scalar wrappers).
2from torch.utils.data import Dataset
Imports PyTorch's abstract Dataset base class. Subclasses must implement __len__ (sample count) and __getitem__ (index → sample). Once that contract is satisfied, DataLoader can wrap the Dataset to provide batching, shuffling and multi-worker loading.
EXECUTION STATE
📚 Dataset = Abstract base class in torch.utils.data. The 'map-style' contract: a dataset is anything with len(ds) and ds[i]. PyTorch builds DataLoader on top of this two-method API.
4R_MAX, THR_DEGR, THR_CRIT = 125, 80, 30
Tuple-unpack the §7.2 cap and §7.3 thresholds. R_MAX caps the regression target; THR_DEGR/THR_CRIT discretise the capped RUL into the 3-class health-state label.
EXECUTION STATE
R_MAX = 125 = Cap for capped RUL (regression target).
THR_DEGR = 80 = RUL ≤ 80 → DEGRADING (class 1).
THR_CRIT = 30 = RUL ≤ 30 → CRITICAL (class 2). Overrides DEGRADING because the assignment happens after.
7class CMAPSSFullDataset(Dataset):
Production Dataset. Inherits from torch.utils.data.Dataset. Integrates feature selection (§5.3), condition discovery (§5.4 / §6.2), per-condition normalisation (§6.3 / §6.4), sliding-window construction (§7.1), RUL cap (§7.2), and health-state labels (§7.3). Every model in Parts V–VII consumes exactly this class.
EXECUTION STATE
(Dataset) = Inheriting from torch.utils.data.Dataset registers this class as a map-style dataset. DataLoader will accept it.
8Docstring (line 1)
Opening line of the docstring — short purpose statement. Visible via help(CMAPSSFullDataset) at runtime.
9Docstring (line 2)
Closing line listing every concern this class subsumes. The promise: nothing else in the codebase has to know about RUL caps or per-condition stats — they all live here.
Two-file constructor. Loads the bundle saved by fit_full_pipeline AND the data CSV, applies all derived columns, and pre-computes the (engine_id, end_cycle) sample index. After __init__ returns, ds[i] is O(1) tensor indexing.
EXECUTION STATE
⬇ input: self = The instance being constructed. Will receive .means, .stds, .km, .window, .samples, .engines attributes.
⬇ input: csv_path = Path to the data file (train_FD00x.txt or test_FD00x.txt). 26-column whitespace-separated text.
⬇ input: bundle_path = Path to the .joblib file written by fit_full_pipeline. Provides means, stds, km, sensor_cols.
⬇ input: window = 30 = Sliding-window length in cycles (§7.1). Each sample is W cycles long. Default 30 — short enough for early-life prediction, long enough to capture trends.
⬆ returns = None (constructor). Side effect: instance now has .means (Tensor), .stds (Tensor), .km, .window, .samples (list of (eid, end)), .engines (dict eid → 4-tuple).
12bundle = joblib.load(bundle_path)
Inverse of joblib.dump. Reads the binary file and reconstructs the dict in memory. The fitted KMeans, the (n_cond, 14) means/stds, and the sensor_cols list all come back identical to what was saved.
EXECUTION STATE
📚 joblib.load(path) = Reads a joblib-pickled file. For NumPy arrays it can mmap, but here we let it materialise.
⬇ arg: bundle_path = e.g. 'fd002_full_pipeline.joblib'.
Convert the (6, 14) NumPy means to a float32 PyTorch tensor and stash on self. We need a tensor (not ndarray) so __getitem__ can index it with another tensor (c_seq) and yield a tensor in one step.
EXECUTION STATE
📚 torch.from_numpy(arr) = Zero-copy bridge: shares memory with the NumPy array. Mutating one mutates the other. The dtype is preserved (float64 → torch.float64).
📚 .float() = Tensor method: cast to torch.float32. Required because models train in float32 by default and dtype mismatches raise errors.
→ why float32 not float64? = GPUs are 2× faster on float32. Memory is halved. Numerical precision is enough for sensor stats.
Same conversion for stds. After this both self.means and self.stds are float32 tensors of identical shape — ready for advanced indexing in __getitem__.
Stash the fitted KMeans estimator (or None) on self. We DO NOT convert it — sklearn estimators stay as sklearn objects. Used below to predict cluster IDs for the data CSV.
EXECUTION STATE
self.km = Either a fitted KMeans (FD002/FD004) or None (FD001/FD003). Used in the conditional on line 28.
16sensor_cols = bundle["sensor_cols"]
Local-scope list of 14 sensor column names. Not stashed on self because we only need it during __init__ to slice the DataFrame; after that, sensor matrices are stored per engine in self.engines.
Start of the 26-column-name list — same construction as fit_full_pipeline. We can't import this from there cheaply, so we duplicate it locally for clarity.
19+ [f"op_set_{i}" for i in range(1, 4)]
Generates op_set_1 / op_set_2 / op_set_3 via list comprehension. Concatenation continues the list expression.
20+ [f"sensor_{i}" for i in range(1, 22)])
Adds sensor_1..sensor_21 (range(1, 22) yields 21 ints). Closing parenthesis ends the multi-line list. Result has length 2 + 3 + 21 = 26.
First arg of np.minimum: the raw RUL Series. groupby + transform('max') computes each engine's max cycle and broadcasts it back to original row count, so we can subtract row-by-row.
EXECUTION STATE
📚 .groupby(col).transform(fn) = Per-group aggregation that BROADCASTS back to original length. Unlike .agg() which collapses to one row per group.
→ example = Engine 1 has 192 cycles, max cycle = 192. transform fills 192 in all 192 of engine 1's rows. Subtracting cycle [1, 2, ..., 192] gives raw RUL [191, 190, ..., 0].
⬇ arg 1 to np.minimum = Series of raw RUL: [191, 190, ..., 0, 286, ..., 0, ...] across all engines.
24R_MAX,
Second arg: scalar 125. Broadcast against the raw RUL Series, every element compared against 125.
EXECUTION STATE
⬇ arg 2 to np.minimum = R_MAX = 125 (scalar). NumPy broadcasts to match the Series shape.
25)
Closing paren of np.minimum. The result is the capped RUL Series, assigned to df['RUL_capped']. Values: 125 plateau for healthy region, then linear decline to 0 in the last 125 cycles.
EXECUTION STATE
⬆ df["RUL_capped"] = Series, length len(df). e.g. for engine 1: [125, 125, ..., 125, 124, 123, ..., 0].
27# Condition assignment using the FITTED km from train
Comment marking the start of the condition-tagging block. Critical discipline: we LOAD the fitted clusterer; we never re-fit on test data (would leak test info into the cluster centroids).
28if self.km is None:
Branch on whether a clusterer was saved. None → single-condition (FD001/FD003); not None → multi-condition (FD002/FD004).
29df["cond"] = 0
Single-condition path. Assign scalar 0; pandas broadcasts to a column of zeros.
Use the FITTED clusterer to assign cluster IDs to the new data. NEVER call .fit() here — that would compute new centroids and the means/stds in the bundle would be misaligned.
EXECUTION STATE
📚 KMeans.predict(X) = Assigns each row of X to the nearest centroid. Returns an int array of cluster IDs in {0, ..., k-1}. O(n × k × d) time. PURE prediction — does NOT modify the estimator.
⬇ arg: df[["op_set_1", "op_set_2", "op_set_3"]] = DataFrame slice → ~53,759 × 3 array. Each row is one cycle's (altitude, Mach, throttle).
→ why predict not fit_predict? = fit_predict refits AND assigns. Would overwrite the train-fit centroids and silently leak test info. Use predict EVERY time after the first fit.
33# Health state from capped RUL
Comment for the §7.3 discretisation block: convert the continuous capped-RUL signal into a 3-class health-state label.
34state = np.zeros(len(df), dtype=np.int64)
Allocate the health-state array, initialised to zeros (class 0 = NORMAL). We will overwrite the rows where RUL drops below thresholds.
EXECUTION STATE
📚 np.zeros(shape, dtype) = Allocate a NumPy array of given shape filled with 0. Memory-allocates a contiguous buffer.
⬇ arg 1: shape = len(df) = Length of the DataFrame, e.g. 53,759. 1-D output.
⬇ arg 2: dtype = np.int64 = 64-bit signed integer. PyTorch's CrossEntropyLoss requires int64 (long) class indices, so we match that here to avoid a later cast.
⬆ state = ndarray, shape (53759,), dtype int64, all zeros.
35state[df["RUL_capped"] <= THR_DEGR] = 1
Boolean-mask assignment. The expression on the left builds a length-N boolean mask; assigning 1 sets every True position to 1 (DEGRADING).
EXECUTION STATE
df["RUL_capped"] <= THR_DEGR = Element-wise comparison Series → boolean Series. True where RUL ≤ 80.
→ example mask = [False, False, ..., True, True, True, True] — flips True for the 80-cycle window before failure.
state[mask] = 1 = Boolean fancy indexing. Every True position set to 1. False positions stay 0.
36state[df["RUL_capped"] <= THR_CRIT] = 2
Same pattern, threshold 30, value 2 (CRITICAL). Order matters: this runs AFTER the THR_DEGR assignment so the last 30 cycles get OVERWRITTEN from 1 → 2. Final encoding: 0=normal, 1=degrading, 2=critical.
EXECUTION STATE
→ why ≤ 30 overrides ≤ 80? = Every row where RUL ≤ 30 also has RUL ≤ 80 (so it was set to 1 first). The second assignment overwrites those rows to 2. Net effect: 0/1/2 partition.
⬆ state distribution = Most rows = 0 (engines spend most life NORMAL). Last 80 cycles per engine = 1 (DEGRADING). Last 30 = 2 (CRITICAL).
38self.window = window
Stash the window length on self so __getitem__ can use it.
EXECUTION STATE
self.window = 30 — sliding-window length in cycles.
39self.samples, self.engines = [], {}
Tuple-unpack assignment. self.samples will collect (engine_id, end_cycle) tuples — one per valid sliding window. self.engines maps engine_id → 4-tuple of pre-extracted arrays.
EXECUTION STATE
self.samples = list[tuple[int, int]]. After the loop: ~53,759 - 100 × W = ~50,000 entries (one per valid window).
Iterate engine-by-engine. groupby returns (group_key, sub_DataFrame) pairs. sub is a DataFrame of all rows belonging to engine eid, in cycle order.
LOOP TRACE · 4 iterations
eid=1
sub = DataFrame, ~192 rows × 26 cols, all cycles for engine 1
eid=2
sub = DataFrame, ~287 rows for engine 2 (engines have different lifetimes)
eid=3
sub = DataFrame for engine 3
...
... = Continues for engines 4 through 100 (FD002 has 100 train engines). Each engine's sub gets its own 4-tuple in self.engines and contributes (len(sub) - W + 1) windows.
Extract the 14 informative sensor columns as a float32 NumPy matrix. float32 chosen because models train in float32 and we want zero-copy via torch.from_numpy.
EXECUTION STATE
sub[sensor_cols] = DataFrame slice → 14 columns, ~N_e rows. Order matches the means/stds columns.
📚 .to_numpy(dtype=...) = Pandas method: returns the underlying ndarray, optionally casting dtype. Cheap for already-numeric columns.
⬆ sensors = ndarray (N_e, 14), dtype float32. e.g. (192, 14) for engine 1.
42cond = sub["cond"].to_numpy(dtype=np.int64)
Per-cycle cluster IDs as int64. dtype matters because we will use these as INDICES into self.means / self.stds in __getitem__, and PyTorch's advanced indexing requires int64 (long).
EXECUTION STATE
⬆ cond = ndarray (N_e,), dtype int64. e.g. [3, 3, 5, 5, 1, 0, 0, 2, ...] — engine cycles through different operating regimes.
Capped RUL as float32 (regression target). MSE/Huber losses operate in float; matches the model's output dtype.
EXECUTION STATE
⬆ rul = ndarray (N_e,), dtype float32. Plateau at 125 then linear decline. e.g. [125, 125, ..., 125, 124, ..., 0] for engine 1.
44hs = state[sub.index]
Pull this engine's health-state values out of the global `state` array. sub.index gives the original row positions of these rows in df, so state[sub.index] returns them in the same cycle order.
EXECUTION STATE
sub.index = Pandas Int64Index — the row labels (= positional indices because we never reset_index'd). e.g. for engine 2: [192, 193, ..., 478].
state[sub.index] = Fancy indexing: returns elements at those positions, length N_e.
Cache the 4-tuple per engine. __getitem__ does NOT re-slice the DataFrame — it just dict-looks-up self.engines[eid] and slices the cached arrays. This makes per-sample retrieval O(W) instead of O(N_e).
EXECUTION STATE
tuple ordering convention = (sensors, cond, rul, hs) — same order across train/test/dataset code. Important: __getitem__ unpacks in this exact order.
46for end in range(window, len(sub) + 1):
Generate every valid sliding-window endpoint. The window covers cycles [end - W, end), so end ranges from W (first complete window) up to len(sub) inclusive (last window ends at the failure cycle).
→ example for engine 1 (192 cycles, W=30) = range(30, 193) yields 30, 31, ..., 192 — 163 valid windows for this engine.
→ why len(sub) + 1? = We want end to take the value len(sub) so the slice [end-W : end] = [len(sub)-W : len(sub)] reaches the last cycle. range stop is exclusive, hence +1.
LOOP TRACE · 5 iterations
end = 30
window covers cycles = [0, 30) — first 30 cycles, target = RUL at cycle 29
end = 31
window covers cycles = [1, 31) — slid forward by one
end = 32
window covers cycles = [2, 32)
...
... = Each step advances end by 1 until end = len(sub). For engine 1 with 192 cycles → 163 iterations.
end = len(sub)
window covers cycles = [len(sub)-30, len(sub)) — final window, ends at failure. RUL = 0, health-state = CRITICAL.
47self.samples.append((eid, end))
Record this (engine_id, end_cycle) as one DataLoader sample. __getitem__(idx) will look up samples[idx] to recover (eid, end) and slice the cached arrays accordingly.
EXECUTION STATE
📚 list.append(x) = In-place: appends x to the end of the list in O(1) amortised.
⬆ self.samples grows = After the inner loop finishes for engine 1 (163 windows), self.samples has 163 entries. After all 100 engines: ~50,000 entries.
49def __len__(self): return len(self.samples)
PyTorch DataLoader contract method. Defines how many samples the dataset has. Single-line because nothing else is needed — len() just delegates to the samples list.
EXECUTION STATE
⬆ returns = int — total number of (engine, window) pairs across all engines. ~50,000 for FD002 train.
51def __getitem__(self, idx) → tuple of 4 tensors
PyTorch DataLoader contract method. Given an integer index, return ONE training sample. Called by DataLoader's worker processes — should be lightweight (no heavy I/O).
⬇ input: idx = Integer in [0, len(self)). DataLoader chooses these (shuffled or sequential).
⬆ returns = tuple of 4 tensors: (x_norm, c_seq, y_rul, y_hs) — described in the return-line annotation.
52eid, end = self.samples[idx]
Look up which (engine, window endpoint) this idx maps to. O(1) list indexing + tuple unpacking.
EXECUTION STATE
→ example: idx = 0 = self.samples[0] = (1, 30) for engine 1's first window. eid=1, end=30.
53sensors, cond, rul, hs = self.engines[eid]
Dict-lookup that engine's 4-tuple of cached arrays. Tuple-unpack into the 4 names. No copy — these are still ndarray references.
EXECUTION STATE
self.engines[eid] = (sensors_ndarray, cond_ndarray, rul_ndarray, hs_ndarray) — pre-computed in __init__.
54s, e = end - self.window, end
Compute the window start and end. self.window=30, so for end=30 → s=0, e=30 → covers cycles [0, 30).
EXECUTION STATE
→ example = end = 30, window = 30 → s = 0, e = 30. Window slice covers cycles 0 through 29 (30 cycles).
56# Apply per-condition Z-score
Comment marking the §6.3 / §6.4 normalisation block. The 4 lines below produce the Z-scored sensor window using the per-cycle's condition-specific (μ, σ).
57x_raw = torch.from_numpy(sensors[s:e])
Slice the engine's full sensor matrix to just this window of W cycles, then bridge to a Tensor (zero-copy view of the same memory).
📚 torch.from_numpy(arr) = Bridge: shares memory with arr. Same dtype, same shape. Becomes a Tensor we can do PyTorch ops on.
⬆ x_raw = torch.Tensor shape (30, 14), dtype float32 — raw sensor readings for the window.
58c_seq = torch.from_numpy(cond[s:e])
Same pattern for the condition sequence. Shape (30,), int64 — one cluster ID per cycle in the window.
EXECUTION STATE
⬆ c_seq = torch.Tensor shape (30,), dtype int64. e.g. tensor([3, 3, 3, 5, 5, 1, 0, 0, 2, 2, ...]) — engine drifted through regimes 3→5→1→0→2 in this window.
59mu = self.means[c_seq]
PyTorch ADVANCED INDEXING. self.means is (6, 14); c_seq is (30,) of int64. The result picks rows: for each of the 30 cluster IDs, fetch the corresponding row of self.means → shape (30, 14).
EXECUTION STATE
📚 tensor[index_tensor] = Advanced indexing: when index is a LongTensor, it gathers rows. self.means[c_seq] = stack of self.means[c_seq[i]] for each i.
→ example = If c_seq[0] = 3 → mu[0] = self.means[3] (the 14-element row of regime-3 means). Repeated for 30 cycles → (30, 14).
⬆ mu = torch.Tensor shape (30, 14), dtype float32. Per-cycle mean — different cycle, different row of stats depending on its regime.
60sigma = self.stds [c_seq]
Same advanced indexing on self.stds. Per-cycle σ matched to the same regime sequence. The space before [c_seq] is a stylistic alignment, not Python syntax.
Per-condition Z-score, computed cycle-by-cycle. Each cycle of x_raw is normalised by its OWN regime's μ and σ. Adding 1e-8 to sigma is the standard numerical-stability trick (prevents divide-by-zero if some sensor is constant within a regime).
EXECUTION STATE
(x_raw - mu) = Element-wise subtraction. Both (30, 14). Centres the data on each cycle's regime mean.
(sigma + 1e-8) = Element-wise add of a tiny constant. Numerical safety: never divide by exactly 0.
→ why per-condition Z-score? = A 'high' temperature reading at cruise altitude is normal but at takeoff it's anomalous. Per-regime normalisation removes the regime offset/scale so the model can focus on degradation, not flight conditions.
Begin the 4-element return tuple. Lines 64–67 are its components. Returning a tuple lets DataLoader's default collate stack each element into a batch tensor independently.
64x_norm,
Element 1: the normalised input window. Shape (30, 14), float32. After collate by DataLoader: (B, 30, 14).
EXECUTION STATE
⬆ x_norm = torch.Size([30, 14]), float32 — the model's INPUT.
65c_seq,
Element 2: the condition sequence. Used by the per-condition normaliser HEAD (not main loss). Shape (30,), int64.
Element 3: the regression target — capped RUL at the LAST cycle of the window. We wrap it as a 0-dim tensor (scalar) so DataLoader can stack across the batch into a (B,) tensor.
EXECUTION STATE
rul[end - 1] = ndarray scalar — capped RUL at cycle (end - 1). For end=30, that's cycle 29 (the last in this window).
📚 torch.tensor(value) = Constructs a Tensor from a Python scalar/list. Default dtype follows the value: float → float32 (with the float() above making it explicit).
→ why scalar at end-1? = Sequence-to-one regression: read 30 cycles, predict ONE value (RUL at the most recent cycle). Future chapters explore sequence-to-sequence variants.
Element 4: the classification target — health state at the last window cycle. int() forces Python int → torch.tensor produces dtype int64 (long), which CrossEntropyLoss requires.
EXECUTION STATE
hs[end - 1] = Class index ∈ {0, 1, 2} for the last cycle in the window.
📚 int(x) = Built-in: cast to Python int. Forces torch.tensor to use int64 (otherwise NumPy int types might flow through).
→ CrossEntropyLoss requirement = PyTorch's nn.CrossEntropyLoss expects target shape (B,) of dtype int64. Float or int32 raises a runtime error.
Closing paren of the return tuple. Final 4-tuple shape summary: (x_norm: (30, 14) float32, c_seq: (30,) int64, y_rul: () float32, y_hs: () int64).
71# Use it
Comment marking the demo block — instantiate the Dataset and pull a sample to verify shapes/dtypes.
72ds = CMAPSSFullDataset(
Open the constructor call. Multi-line because we want kwargs visible. Triggers __init__: load bundle → load CSV → derive RUL/cond/state → build samples + engines.
73csv_path="data/raw/train_FD002.txt",
First kwarg: the data file. Here we re-use the train file just to demo the Dataset; in real training you'd point it at train_FD002.txt; for evaluation, test_FD002.txt + RUL_FD002.txt.
74bundle_path="fd002_full_pipeline.joblib",
Second kwarg: the bundle path. The .joblib file produced by fit_full_pipeline above. Mismatched bundles silently misalign means/stds — keep the naming convention strict.
75window=30,
Third kwarg: the sliding-window length. 30 cycles is the standard choice across this book and most C-MAPSS literature.
76)
Closing paren of the constructor call. After this, ds is a fully-initialised CMAPSSFullDataset with ~50,000 samples ready for DataLoader.
y_hs = torch.Tensor (), int64 — health state at cycle 29 = 0 (NORMAL).
79print("X.shape :", tuple(X.shape))
Print the input shape. tuple(X.shape) coerces torch.Size into a plain Python tuple for cleaner printing.
EXECUTION STATE
📚 tuple(t.shape) = torch.Size is a subclass of tuple but prints as 'torch.Size([30, 14])'. tuple() unwraps it to (30, 14).
⬆ stdout = X.shape : (30, 14)
80print("c.shape :", tuple(c.shape))
Verify the condition sequence is 1-D of length W=30.
EXECUTION STATE
⬆ stdout = c.shape : (30,)
81print("y_rul.dtype:", y_rul.dtype)
Confirm the RUL target is float32 — the dtype the regression loss (MSE / Huber) needs.
EXECUTION STATE
⬆ stdout = y_rul.dtype: torch.float32
82print("y_hs.dtype :", y_hs.dtype)
Confirm the health-state target is int64 — the dtype CrossEntropyLoss requires for class indices.
EXECUTION STATE
⬆ stdout = y_hs.dtype : torch.int64
15 lines without explanation
1import joblib, numpy as np, pandas as pd, torch
2from torch.utils.data import Dataset
34R_MAX, THR_DEGR, THR_CRIT =125,80,30567classCMAPSSFullDataset(Dataset):8"""The production Dataset that integrates feature selection,
9 condition discovery, RUL cap, health labels, and sliding windows."""1011def__init__(self, csv_path:str, bundle_path:str, window:int=30):12 bundle = joblib.load(bundle_path)13 self.means = torch.from_numpy(bundle["means"]).float()14 self.stds = torch.from_numpy(bundle["stds"]).float()15 self.km = bundle["km"]16 sensor_cols = bundle["sensor_cols"]1718 cols =(["engine_id","cycle"]19+[f"op_set_{i}"for i inrange(1,4)]20+[f"sensor_{i}"for i inrange(1,22)])21 df = pd.read_csv(csv_path, sep=r"\s+", header=None, names=cols)22 df["RUL_capped"]= np.minimum(23 df.groupby("engine_id")["cycle"].transform("max")- df["cycle"],24 R_MAX,25)2627# Condition assignment using the FITTED km from train28if self.km isNone:29 df["cond"]=030else:31 df["cond"]= self.km.predict(df[["op_set_1","op_set_2","op_set_3"]])3233# Health state from capped RUL34 state = np.zeros(len(df), dtype=np.int64)35 state[df["RUL_capped"]<= THR_DEGR]=136 state[df["RUL_capped"]<= THR_CRIT]=23738 self.window = window
39 self.samples, self.engines =[],{}40for eid, sub in df.groupby("engine_id"):41 sensors = sub[sensor_cols].to_numpy(dtype=np.float32)42 cond = sub["cond"].to_numpy(dtype=np.int64)43 rul = sub["RUL_capped"].to_numpy(dtype=np.float32)44 hs = state[sub.index]45 self.engines[eid]=(sensors, cond, rul, hs)46for end inrange(window,len(sub)+1):47 self.samples.append((eid, end))4849def__len__(self):returnlen(self.samples)5051def__getitem__(self, idx):52 eid, end = self.samples[idx]53 sensors, cond, rul, hs = self.engines[eid]54 s, e = end - self.window, end
5556# Apply per-condition Z-score57 x_raw = torch.from_numpy(sensors[s:e])# (W, 14)58 c_seq = torch.from_numpy(cond[s:e])# (W,)59 mu = self.means[c_seq]# (W, 14)60 sigma = self.stds [c_seq]# (W, 14)61 x_norm =(x_raw - mu)/(sigma +1e-8)6263return(64 x_norm,# (W, 14) normalised input65 c_seq,# (W,) condition labels66 torch.tensor(float(rul[end -1])),# () scalar capped RUL67 torch.tensor(int(hs[end -1])),# () scalar health state (0/1/2)68)697071# Use it72ds = CMAPSSFullDataset(73 csv_path="data/raw/train_FD002.txt",74 bundle_path="fd002_full_pipeline.joblib",75 window=30,76)7778X, c, y_rul, y_hs = ds[0]79print("X.shape :",tuple(X.shape))# (30, 14)80print("c.shape :",tuple(c.shape))# (30,)81print("y_rul.dtype:", y_rul.dtype)# torch.float3282print("y_hs.dtype :", y_hs.dtype)# torch.int64
The DataLoader on Top
DataLoader configuration is two lines once the Dataset is in place. Three production knobs matter:
Argument
Value (this book)
Why
batch_size
256
Standard for 3.5M-param models on a single 16 GB GPU
shuffle
True (train), False (test/eval)
SGD needs random order in train; eval needs deterministic order
num_workers
4
Parallel data-loading threads. 4 keeps the GPU fed without overwhelming the CPU
pin_memory
True
Pinned host memory makes .to('cuda', non_blocking=True) faster
drop_last
False (train), False (eval)
We don't drop last batch - C-MAPSS batches always divide cleanly enough
persistent_workers
True
Avoids re-spawning workers between epochs - shaves seconds per epoch
Every model in Parts V-VII trains with the SAME DataLoader config. Only the loss function (AMNL / GABA / GRACE) varies. Chapter 15 walks through the unified training loop end to end.
Pipeline Patterns Across ML
Domain
Equivalent of CMAPSSFullDataset
Bundle persistence
RUL (this book)
CMAPSSFullDataset
joblib (sklearn + means/stds)
NLP
HuggingFace tokenizer + DataCollator
tokenizer.json
Vision
torchvision Dataset + transforms
Pillow + PyTorch state_dict
Speech
Kaldi feature extractor
.ark / .scp pairs
Recommender
Per-user / per-item embedding lookups
Faiss index + embedding tensors
Three Pipeline Pitfalls
Pitfall 1: Using TEST means/stds. Section 6.4 warned about this. The bundle is fit on TRAIN only. Re-fitting on test silently leaks; the test pipeline must only LOAD, never FIT.
Pitfall 2: Window size mismatch between train and test. Always use the same W = 30 for both. Test engines truncated to fewer than W cycles get padding (or are skipped) - track this explicitly.
Pitfall 3: Forgetting to set num_workers=0 when debugging. Multi-worker DataLoaders hide print statements and obscure stack traces. Set num_workers=0 while debugging an exception inside __getitem__.
The point. One Dataset class wraps the entire data path. Train pipeline fits, persists; test pipeline loads, applies. Every model in Parts V-VII reads from this same contract.
Takeaway
Per-sample output is a 4-tuple. (X_norm, cond, y_rul, y_hs). Three of those four feed model heads; cond feeds the normaliser only.
Train fits the bundle once. joblib serialises k-means + per-cond means / stds + sensor_cols.
Test loads the bundle, never refits. Section 6.4 discipline.
DataLoader config is uniform across the book. Batch 256, shuffle on train, 4 workers, pin_memory, persistent workers. Same config for AMNL / GABA / GRACE.