Chapter 7
13 min read
Section 30 of 121

PyTorch Dataset & DataLoader

Sequences, RUL Cap & Health Labels

Assembling the Pieces

We have built every component of the data pipeline in isolation: feature selection (§5.3), condition discovery (§5.4 / §6.2), per-condition normalisation (§6.3, §6.4), sliding-window construction (§7.1), RUL capping (§7.2), health-state discretisation (§7.3). This section unifies them into one production-grade CMAPSSFullDataset that the rest of the book consumes verbatim.

Once you have this Dataset, every training script becomes a four-input training loop. Forward (X, cond, ...); backward through (rul_loss + health_loss). Chapters 15, 20, 22 all reduce to this skeleton.

The Contract: One Sample, Three Outputs

Each call to ds[i] returns a 4-tuple (Xnorm,c,yrul,yhs)(\mathbf{X}_{\text{norm}}, \mathbf{c},\, y_{\text{rul}},\, y_{\text{hs}}):

FieldShapedtypeSource
Xnorm\mathbf{X}_{\text{norm}}(W, 14)float32§5.3 select + §6.3 normalise
c\mathbf{c}(W,)int64§5.4 / §6.2 cluster IDs per cycle
yruly_{\text{rul}}()float32§7.2 capped at R_max
yhsy_{\text{hs}}()int64§7.3 health-state class

Python: The Full Pipeline as One Function

One function reads train, fits everything, persists the bundle
🐍fit_full_pipeline.py
1import numpy as np, pandas as pd, joblib

Triple-import on one line. NumPy gives us fast N-dimensional arrays + the math kernels (np.minimum, np.stack). Pandas gives us the labelled DataFrame for engine/cycle/sensor tabular data. Joblib persists arbitrary Python objects (sklearn estimators, NumPy arrays) to a single .joblib file we can reload at test time.

EXECUTION STATE
numpy as np = Numerical computing — ndarray, broadcasting, vectorised math. Used here for np.minimum (RUL cap) and np.stack (per-condition matrices).
pandas as pd = Tabular data with row/column labels. Used here to read the C-MAPSS .txt file and groupby('engine_id') for per-engine cycle counting.
joblib = Serialises Python objects (especially NumPy + sklearn) to disk. Faster than pickle for arrays. Used to dump the bundle so test code only LOADS, never re-fits.
2from sklearn.cluster import KMeans

Imports sklearn's K-Means clusterer. We will fit it on the 3-D operational-setting space (op_set_1, op_set_2, op_set_3) to discover the n_conditions=6 flight regimes that the C-MAPSS FD002 / FD004 datasets contain.

EXECUTION STATE
📚 KMeans = Unsupervised algorithm that partitions points into k clusters by minimising within-cluster variance. fit() learns centroids; .labels_ gives cluster IDs; .predict() assigns new points.
4INFORMATIVE = [1, 2, 3, 6, 7, 8, 10, 11, 12, 13, 14, 16, 19, 20]

0-based indices of the 14 sensors that survived feature selection in §5.3. The 21 raw C-MAPSS sensors include 7 that are flat lines (zero variance) — those were dropped. The remaining 14 carry the degradation signal.

EXECUTION STATE
INFORMATIVE = [1, 2, 3, 6, 7, 8, 10, 11, 12, 13, 14, 16, 19, 20] — 14 indices
→ why 0-based = These are list indices into the 21-sensor list, not sensor numbers. Index 0 → sensor_1 dropped, index 1 → sensor_2 kept, etc. Line 19 converts these to sensor_2..sensor_21 column names.
5R_MAX = 125

Maximum useful-life cap (cycles). From §7.2: capping RUL at 125 stops the loss from over-weighting healthy engines (where true RUL > 125 is just 'lots'). Used by the Dataset.__getitem__ to compute y_rul.

EXECUTION STATE
R_MAX = 125 = Cycles. Any raw RUL > 125 → clamped to 125. Empirically chosen so the regression target spans a 'meaningful' degradation regime instead of unbounded healthy values.
6THR_DEGR, THR_CRIT = 80, 30

Health-state thresholds (§7.3). Tuple-unpacking assigns 80 → THR_DEGR (degrading) and 30 → THR_CRIT (critical). The Dataset converts capped RUL into a 3-class label: normal (RUL > 80) / degrading (30 < RUL ≤ 80) / critical (RUL ≤ 30).

EXECUTION STATE
THR_DEGR = 80 = Cycles. If RUL_capped ≤ 80, the engine enters DEGRADING state (class 1).
THR_CRIT = 30 = Cycles. If RUL_capped ≤ 30, the engine enters CRITICAL state (class 2). Overrides DEGRADING because the check happens after.
9def fit_full_pipeline(train_path, n_conditions=6, seed=0) → dict

Single function that performs ALL train-time fitting: load CSV, compute RUL, fit k-means on op_settings, compute per-condition sensor means/stds. Returns one bundle that test code can re-load.

EXECUTION STATE
⬇ input: train_path = Path to a C-MAPSS train file, e.g. 'data/raw/train_FD002.txt'. Whitespace-separated, 26 columns, no header.
⬇ input: n_conditions = 6 = Number of operating-condition clusters. FD002 / FD004 → 6 (multi-condition). FD001 / FD003 → 1 (single-condition, k-means is skipped).
⬇ input: seed = 0 = RNG seed for KMeans's random init. Same seed → reproducible cluster assignments across runs (critical for reproducible per-condition stats).
⬆ returns = dict with 5 keys: km (fitted KMeans or None), means (n_cond × 14), stds (n_cond × 14), sensor_cols (list of 14 names), n_conditions (int).
10Docstring (first line)

First line of the docstring. Communicates the function's contract: it rolls every train-time fit into one bundle that downstream code can load and apply.

11Docstring (second line)

Closing line of the docstring. Reinforces that the OUTPUT is exactly what test inference needs — no recomputation at test time.

12cols = (["engine_id", "cycle"]

Start building the 26-element column-name list. C-MAPSS files have no header row, so we must supply names ourselves. The schema is fixed: engine_id, cycle, then 3 op_settings, then 21 sensors.

EXECUTION STATE
["engine_id", "cycle"] = First two columns: engine_id (which engine) and cycle (timestep number within that engine).
13+ [f"op_set_{i}" for i in range(1, 4)]

List comprehension generating the 3 operational-setting column names. range(1, 4) yields 1, 2, 3 → produces ['op_set_1', 'op_set_2', 'op_set_3']. The + concatenates this list with the previous one.

EXECUTION STATE
📚 range(start, stop) = Built-in: yields integers from start (inclusive) to stop (exclusive). range(1, 4) → 1, 2, 3.
f"op_set_{i}" = f-string formatting: substitutes the loop variable into the literal. i=1 → 'op_set_1'.
→ result fragment = ['op_set_1', 'op_set_2', 'op_set_3']
14+ [f"sensor_{i}" for i in range(1, 22)])

Generates 21 sensor column names: sensor_1 .. sensor_21. range(1, 22) yields 1..21 (stop is exclusive). The closing parenthesis ends the multi-line tuple/list expression started on line 12.

EXECUTION STATE
range(1, 22) = Yields 1, 2, 3, ..., 21 — exactly 21 integers.
→ result fragment = ['sensor_1', 'sensor_2', ..., 'sensor_21']
→ final cols length = 2 + 3 + 21 = 26 column names
15df = pd.read_csv(train_path, sep=r"\s+", header=None, names=cols)

Loads the whitespace-separated text file into a DataFrame. C-MAPSS files have NO header row and use variable-width whitespace (not tabs, not single spaces) — both quirks must be told to read_csv.

EXECUTION STATE
📚 pd.read_csv() = Pandas function: parses CSV/TSV/whitespace-separated text into a DataFrame. Handles type inference, missing values, quoting, and large files via chunking.
⬇ arg 1: train_path = File path. Pandas opens the file and streams it.
⬇ arg 2: sep=r"\s+" = Regex 'one or more whitespace chars'. Needed because C-MAPSS uses variable spaces, not a fixed delimiter. The r prefix makes it a raw string so backslashes aren't double-escaped.
⬇ arg 3: header=None = Tells pandas there is NO header row in the file — every row is data. Without this, pandas would treat row 0 as column names.
⬇ arg 4: names=cols = Provide column names explicitly. Pandas applies these to the 26 columns it parsed.
⬆ result: df = DataFrame, ~53,759 rows × 26 cols for FD002. Columns: engine_id, cycle, op_set_1..3, sensor_1..21.
16df["RUL_raw"] = df.groupby("engine_id")["cycle"].transform("max") - df["cycle"]

Computes raw remaining-useful-life per row: max_cycle_for_this_engine − current_cycle. groupby + transform broadcasts the per-engine max back to every row of that engine, so subtraction is element-wise.

EXECUTION STATE
📚 df.groupby(col) = Splits the DataFrame into groups by unique values of col. Returns a GroupBy object — lazy until you call an aggregation. Here we group by engine_id so each engine's cycles stay together.
📚 .transform("max") = Applies aggregation per group then broadcasts back to ORIGINAL row count. Unlike .max() which collapses to one row per group, transform keeps the same length so we can subtract row-wise.
→ example: engine 1 with 192 cycles = df.groupby('engine_id')['cycle'].transform('max') puts 192 in all 192 rows for engine 1. Then 192 - cycle gives RUL: 191 (cycle 1), 190 (cycle 2), ..., 0 (cycle 192).
⬆ result: df["RUL_raw"] = New column. Values like 191, 190, ..., 0 for engine 1; 286, 285, ..., 0 for engine 2 (engine 2 has 287 cycles).
17df["RUL_capped"] = np.minimum(df["RUL_raw"], R_MAX)

Caps RUL at R_MAX = 125. Every raw RUL above 125 is clamped to 125; values ≤ 125 pass through unchanged. This is the §7.2 cap — used downstream by Dataset.__getitem__ to produce y_rul.

EXECUTION STATE
📚 np.minimum(a, b) = Element-wise min. For arrays a and b of equal/broadcastable shape, returns an array where out[i] = min(a[i], b[i]). Different from np.min (which reduces along an axis).
⬇ arg 1: df["RUL_raw"] = Series of raw RUL values, e.g. [191, 190, ..., 0, 286, 285, ..., 0]
⬇ arg 2: R_MAX = 125 = Scalar broadcast against the Series — every element compared against 125.
→ example = min(191, 125) = 125 min(124, 125) = 124 min(0, 125) = 0
⬆ result: df["RUL_capped"] = [125, 125, ..., 125, 124, 123, ..., 0] — flat 125 plateau for healthy region, then linear ramp to 0 in last 125 cycles.
19sensor_cols = [f"sensor_{i+1}" for i in INFORMATIVE]

Converts the 0-based INFORMATIVE indices into the 14 actual column names ('sensor_2', 'sensor_3', ...) that we want to keep. The +1 maps INFORMATIVE entry 1 → sensor_2.

EXECUTION STATE
for i in INFORMATIVE = Loops i over [1, 2, 3, 6, 7, 8, 10, 11, 12, 13, 14, 16, 19, 20].
f"sensor_{i+1}" = Builds the column name. i=1 → 'sensor_2', i=2 → 'sensor_3', i=20 → 'sensor_21'.
⬆ result: sensor_cols = ['sensor_2', 'sensor_3', 'sensor_4', 'sensor_7', 'sensor_8', 'sensor_9', 'sensor_11', 'sensor_12', 'sensor_13', 'sensor_14', 'sensor_15', 'sensor_17', 'sensor_20', 'sensor_21'] — 14 names.
21if n_conditions > 1:

Branch: if more than one operating regime exists, fit k-means; otherwise (FD001/FD003) skip clustering and tag every row with cond=0. Mirrors the test-time branch in CMAPSSFullDataset.__init__.

EXECUTION STATE
n_conditions = Function parameter. 6 for FD002 → enters this branch.
22km = KMeans(n_clusters=n_conditions, n_init=10, random_state=seed).fit(df[["op_set_1", "op_set_2", "op_set_3"]])

Constructs a K-Means estimator with n_conditions=6 clusters, then fits it on the 3-D operational-setting matrix. .fit() runs Lloyd's algorithm to find the 6 centroids that minimise within-cluster variance.

EXECUTION STATE
📚 KMeans(...) = sklearn estimator constructor. Returns an unfitted KMeans object. Calling .fit(X) populates .cluster_centers_, .labels_, .inertia_.
⬇ arg 1: n_clusters = 6 = How many clusters/regimes to find. Domain knowledge: C-MAPSS FD002 has 6 distinct flight conditions (combinations of altitude × Mach × throttle).
⬇ arg 2: n_init = 10 = Number of independent k-means runs with different random initial centroids. Final result = best of 10 (lowest inertia). Avoids being stuck in poor local minima.
⬇ arg 3: random_state = seed = Seeds the RNG for centroid initialisation. seed=0 → same starting state every run → identical cluster assignments → identical means/stds bundle.
📚 .fit(X) = Lloyd's algorithm: alternates assigning each point to the nearest centroid and recomputing centroids as cluster means until convergence.
⬇ fit input: df[["op_set_1", "op_set_2", "op_set_3"]] = DataFrame slice → ~53,759 × 3 matrix of (altitude, Mach, throttle) for every cycle of every engine.
⬆ result: km = Fitted KMeans. km.cluster_centers_ shape (6, 3); km.labels_ shape (53759,) with values in {0, 1, 2, 3, 4, 5}.
23df["cond"] = km.labels_

Tags every row with its cluster ID. km.labels_ is a NumPy array of length len(df), aligned to the original row order, so direct column assignment works without re-indexing.

EXECUTION STATE
km.labels_ = 1-D NumPy array, dtype int32, shape (53759,) for FD002. Each element ∈ {0, 1, 2, 3, 4, 5}.
⬆ result: df["cond"] = New column tagging each cycle with its operating regime. e.g. [3, 3, 5, 5, 1, 1, 0, 2, ...] — each engine cycles through different conditions during a flight.
24else:

Single-condition branch. Reached when n_conditions = 1 (FD001 / FD003 — fixed sea-level cruise). No clustering needed.

25km = None

Set km to None in the bundle. Test code checks `if self.km is None:` to mirror this branch.

EXECUTION STATE
km = None = Sentinel value. Marks 'no clusterer fitted' so downstream code can branch correctly.
26df["cond"] = 0

Assign the scalar 0 to a new column — pandas broadcasts it to every row. Means/stds will then have shape (1, 14): one row of stats for the single regime.

EXECUTION STATE
broadcasting scalar to column = df['cond'] = 0 creates a column of length len(df) filled with 0. Equivalent to np.zeros(len(df), dtype=int).
28means = np.stack([... for c in range(n_conditions)])

Build the per-condition mean matrix. For each condition c ∈ {0, ..., 5}, filter rows where cond == c, take the mean of the 14 sensor columns → 1-D array of length 14. Stack the 6 arrays vertically into a (6, 14) matrix.

EXECUTION STATE
📚 np.stack(arrays, axis=0) = Stacks a sequence of arrays along a NEW axis. Default axis=0 makes them rows of a new array. Different from np.concatenate which joins along an EXISTING axis.
⬇ inner: df.loc[df["cond"] == c, sensor_cols] = Boolean filter + column select. Returns rows where cond == c and only the 14 sensor columns. e.g. for c=0 maybe 8,800 rows × 14 cols.
📚 .mean() = Pandas DataFrame method: column-wise mean by default. Returns a Series of length 14 (one mean per sensor).
📚 .to_numpy() = Converts the Series to a 1-D NumPy array. Drops the index. Required because np.stack expects ndarrays.
29for c in range(n_conditions)

Continuation of the means comprehension on line 28. Loops c through 0, 1, 2, 3, 4, 5 (n_conditions=6) — one iteration per regime.

LOOP TRACE · 6 iterations
c = 0
result = row 0 of means: 14-element vector of mean sensor values for regime 0
c = 1
result = row 1 of means: regime 1 means (different altitude/Mach combination)
c = 2
result = row 2 of means
c = 3
result = row 3 of means
c = 4
result = row 4 of means
c = 5
result = row 5 of means — last iteration; np.stack then assembles all 6 rows
⬆ means.shape = (6, 14) — 6 regimes × 14 sensors
30stds = np.stack([... for c in range(n_conditions)])

Same pattern as means, but with .std() (sample standard deviation). Used at test time as the σ in the per-condition Z-score: x_norm = (x − μ_cond) / (σ_cond + 1e-8).

EXECUTION STATE
📚 .std() = Pandas DataFrame method: column-wise sample std (ddof=1 by default — divides by N-1). Returns a Series of length 14.
→ why ddof=1? = Pandas uses unbiased estimator (sample std). NumPy default is population std (ddof=0). Slight difference for large N, irrelevant for this dataset.
⬆ stds.shape = (6, 14) — same shape as means, indexed identically by cluster ID.
31for c in range(n_conditions)

Continuation of the stds comprehension. Same 0..5 loop as means, producing one std vector per regime.

33return {"km": km, "means": means, "stds": stds, "sensor_cols": sensor_cols, "n_conditions": n_conditions}

Pack everything the test pipeline needs into a single dict. The bundle contains the FITTED estimator (km), the per-condition statistics (means, stds), the ordered column-name list (sensor_cols) and the cluster count (n_conditions).

EXECUTION STATE
⬆ return: dict = {'km': KMeans, 'means': (6,14) ndarray, 'stds': (6,14) ndarray, 'sensor_cols': list[str] of length 14, 'n_conditions': 6}
→ why a dict? = Single object — joblib.dump can serialise it in one shot. Test code only needs to call joblib.load(bundle_path) to get everything back.
36# Save once

Comment marking the start of the script-level work. Train fits ONCE and persists; test never refits.

37bundle = fit_full_pipeline("data/raw/train_FD002.txt", n_conditions=6)

Calls the function on the FD002 training file. Returns the dict described on line 33.

EXECUTION STATE
⬇ arg 1: train_path = 'data/raw/train_FD002.txt' — multi-condition C-MAPSS train file.
⬇ arg 2: n_conditions = 6 — kwarg, takes the multi-condition branch.
⬆ result: bundle = dict with keys km, means (6×14), stds (6×14), sensor_cols (14 names), n_conditions=6.
38joblib.dump(bundle, "fd002_full_pipeline.joblib")

Serialises the bundle to disk in joblib format. One file ↔ one bundle. Test code uses joblib.load(...) to round-trip back to the same dict.

EXECUTION STATE
📚 joblib.dump(value, filename) = Pickles `value` and writes it to `filename`. Optimised for NumPy arrays — uses memory-mapped storage when arrays are large.
⬇ arg 1: bundle = The dict to persist.
⬇ arg 2: "fd002_full_pipeline.joblib" = Output path. Convention: include the dataset name + 'pipeline' so multiple bundles don't collide.
→ side effect = Writes a binary file ~few hundred KB. Reproducibility guarantee: anyone with this file gets the EXACT same means/stds/cluster centroids.
40print("means.shape :", bundle["means"].shape)

Prints the means array's shape. Tuple printed as (rows, cols).

EXECUTION STATE
bundle["means"].shape = (6, 14) — accessed via dict key, then ndarray .shape attribute.
⬆ stdout = means.shape : (6, 14)
41print("stds.shape :", bundle["stds"].shape)

Same shape check for stds. Should match means exactly — they were both built from the same per-condition row filters.

EXECUTION STATE
⬆ stdout = stds.shape : (6, 14)
42print("sensor_cols :", bundle["sensor_cols"][:5], "...")

Slice [:5] takes the first 5 names; '...' is a literal string (not Python's Ellipsis) for visual brevity. Verifies the column-name list survived serialisation.

EXECUTION STATE
bundle["sensor_cols"][:5] = List slice — first 5 elements of the 14-element list.
⬆ stdout = sensor_cols : ['sensor_2', 'sensor_3', 'sensor_4', 'sensor_7', 'sensor_8'] ...
10 lines without explanation
1import numpy as np, pandas as pd, joblib
2from sklearn.cluster import KMeans
3
4INFORMATIVE = [1, 2, 3, 6, 7, 8, 10, 11, 12, 13, 14, 16, 19, 20]    # §5.3
5R_MAX = 125                                                          # §7.2
6THR_DEGR, THR_CRIT = 80, 30                                          # §7.3
7
8
9def fit_full_pipeline(train_path: str, n_conditions: int = 6, seed: int = 0):
10    """One function. Reads train; fits k-means; computes per-cond stats;
11    selects 14 sensors; returns the bundle that test inference needs."""
12    cols = (["engine_id", "cycle"]
13            + [f"op_set_{i}" for i in range(1, 4)]
14            + [f"sensor_{i}" for i in range(1, 22)])
15    df = pd.read_csv(train_path, sep=r"\s+", header=None, names=cols)
16    df["RUL_raw"]    = df.groupby("engine_id")["cycle"].transform("max") - df["cycle"]
17    df["RUL_capped"] = np.minimum(df["RUL_raw"], R_MAX)
18
19    sensor_cols = [f"sensor_{i+1}" for i in INFORMATIVE]            # 14 sensors
20
21    if n_conditions > 1:
22        km = KMeans(n_clusters=n_conditions, n_init=10, random_state=seed).fit(df[["op_set_1", "op_set_2", "op_set_3"]])
23        df["cond"] = km.labels_
24    else:
25        km = None
26        df["cond"] = 0
27
28    means = np.stack([df.loc[df["cond"] == c, sensor_cols].mean().to_numpy()
29                      for c in range(n_conditions)])
30    stds  = np.stack([df.loc[df["cond"] == c, sensor_cols].std().to_numpy()
31                      for c in range(n_conditions)])
32
33    return {"km": km, "means": means, "stds": stds, "sensor_cols": sensor_cols, "n_conditions": n_conditions}
34
35
36# Save once
37bundle = fit_full_pipeline("data/raw/train_FD002.txt", n_conditions=6)
38joblib.dump(bundle, "fd002_full_pipeline.joblib")
39
40print("means.shape   :", bundle["means"].shape)        # (6, 14)
41print("stds.shape    :", bundle["stds"].shape)         # (6, 14)
42print("sensor_cols   :", bundle["sensor_cols"][:5], "...")  # first 5 names

PyTorch: The Production CMAPSSFullDataset

The unifying class. __getitem__ pre-computes normalised inputs at index time so training-loop overhead is minimal.

Production Dataset - 4-tuple per sample, all transforms applied
🐍cmapss_full_dataset.py
1import joblib, numpy as np, pandas as pd, torch

Four imports on one line. joblib loads the bundle that fit_full_pipeline saved. numpy + pandas reproduce the train-time loading logic. torch provides the Tensor type and the Dataset base class.

EXECUTION STATE
joblib = Same library used to dump the bundle. joblib.load(path) deserialises it back into a dict.
numpy as np = Used here for np.minimum (RUL cap) and np.zeros (state array).
pandas as pd = Reads the CSV, exposes groupby for sliding-window iteration over engines.
torch = PyTorch core. Used for torch.from_numpy (zero-copy ndarray → Tensor) and torch.tensor (scalar wrappers).
2from torch.utils.data import Dataset

Imports PyTorch's abstract Dataset base class. Subclasses must implement __len__ (sample count) and __getitem__ (index → sample). Once that contract is satisfied, DataLoader can wrap the Dataset to provide batching, shuffling and multi-worker loading.

EXECUTION STATE
📚 Dataset = Abstract base class in torch.utils.data. The 'map-style' contract: a dataset is anything with len(ds) and ds[i]. PyTorch builds DataLoader on top of this two-method API.
4R_MAX, THR_DEGR, THR_CRIT = 125, 80, 30

Tuple-unpack the §7.2 cap and §7.3 thresholds. R_MAX caps the regression target; THR_DEGR/THR_CRIT discretise the capped RUL into the 3-class health-state label.

EXECUTION STATE
R_MAX = 125 = Cap for capped RUL (regression target).
THR_DEGR = 80 = RUL ≤ 80 → DEGRADING (class 1).
THR_CRIT = 30 = RUL ≤ 30 → CRITICAL (class 2). Overrides DEGRADING because the assignment happens after.
7class CMAPSSFullDataset(Dataset):

Production Dataset. Inherits from torch.utils.data.Dataset. Integrates feature selection (§5.3), condition discovery (§5.4 / §6.2), per-condition normalisation (§6.3 / §6.4), sliding-window construction (§7.1), RUL cap (§7.2), and health-state labels (§7.3). Every model in Parts V–VII consumes exactly this class.

EXECUTION STATE
(Dataset) = Inheriting from torch.utils.data.Dataset registers this class as a map-style dataset. DataLoader will accept it.
8Docstring (line 1)

Opening line of the docstring — short purpose statement. Visible via help(CMAPSSFullDataset) at runtime.

9Docstring (line 2)

Closing line listing every concern this class subsumes. The promise: nothing else in the codebase has to know about RUL caps or per-condition stats — they all live here.

11def __init__(self, csv_path, bundle_path, window=30) → None

Two-file constructor. Loads the bundle saved by fit_full_pipeline AND the data CSV, applies all derived columns, and pre-computes the (engine_id, end_cycle) sample index. After __init__ returns, ds[i] is O(1) tensor indexing.

EXECUTION STATE
⬇ input: self = The instance being constructed. Will receive .means, .stds, .km, .window, .samples, .engines attributes.
⬇ input: csv_path = Path to the data file (train_FD00x.txt or test_FD00x.txt). 26-column whitespace-separated text.
⬇ input: bundle_path = Path to the .joblib file written by fit_full_pipeline. Provides means, stds, km, sensor_cols.
⬇ input: window = 30 = Sliding-window length in cycles (§7.1). Each sample is W cycles long. Default 30 — short enough for early-life prediction, long enough to capture trends.
⬆ returns = None (constructor). Side effect: instance now has .means (Tensor), .stds (Tensor), .km, .window, .samples (list of (eid, end)), .engines (dict eid → 4-tuple).
12bundle = joblib.load(bundle_path)

Inverse of joblib.dump. Reads the binary file and reconstructs the dict in memory. The fitted KMeans, the (n_cond, 14) means/stds, and the sensor_cols list all come back identical to what was saved.

EXECUTION STATE
📚 joblib.load(path) = Reads a joblib-pickled file. For NumPy arrays it can mmap, but here we let it materialise.
⬇ arg: bundle_path = e.g. 'fd002_full_pipeline.joblib'.
⬆ result: bundle = {'km': KMeans, 'means': (6,14) ndarray, 'stds': (6,14) ndarray, 'sensor_cols': list[14], 'n_conditions': 6}
13self.means = torch.from_numpy(bundle["means"]).float()

Convert the (6, 14) NumPy means to a float32 PyTorch tensor and stash on self. We need a tensor (not ndarray) so __getitem__ can index it with another tensor (c_seq) and yield a tensor in one step.

EXECUTION STATE
📚 torch.from_numpy(arr) = Zero-copy bridge: shares memory with the NumPy array. Mutating one mutates the other. The dtype is preserved (float64 → torch.float64).
📚 .float() = Tensor method: cast to torch.float32. Required because models train in float32 by default and dtype mismatches raise errors.
→ why float32 not float64? = GPUs are 2× faster on float32. Memory is halved. Numerical precision is enough for sensor stats.
⬆ self.means = torch.Tensor shape (6, 14), dtype torch.float32.
14self.stds = torch.from_numpy(bundle["stds"]).float()

Same conversion for stds. After this both self.means and self.stds are float32 tensors of identical shape — ready for advanced indexing in __getitem__.

EXECUTION STATE
⬆ self.stds = torch.Tensor shape (6, 14), dtype torch.float32.
15self.km = bundle["km"]

Stash the fitted KMeans estimator (or None) on self. We DO NOT convert it — sklearn estimators stay as sklearn objects. Used below to predict cluster IDs for the data CSV.

EXECUTION STATE
self.km = Either a fitted KMeans (FD002/FD004) or None (FD001/FD003). Used in the conditional on line 28.
16sensor_cols = bundle["sensor_cols"]

Local-scope list of 14 sensor column names. Not stashed on self because we only need it during __init__ to slice the DataFrame; after that, sensor matrices are stored per engine in self.engines.

EXECUTION STATE
sensor_cols = ['sensor_2', 'sensor_3', 'sensor_4', 'sensor_7', 'sensor_8', 'sensor_9', 'sensor_11', 'sensor_12', 'sensor_13', 'sensor_14', 'sensor_15', 'sensor_17', 'sensor_20', 'sensor_21'] — 14 names, ordering must match means/stds columns.
18cols = (["engine_id", "cycle"]

Start of the 26-column-name list — same construction as fit_full_pipeline. We can't import this from there cheaply, so we duplicate it locally for clarity.

19+ [f"op_set_{i}" for i in range(1, 4)]

Generates op_set_1 / op_set_2 / op_set_3 via list comprehension. Concatenation continues the list expression.

20+ [f"sensor_{i}" for i in range(1, 22)])

Adds sensor_1..sensor_21 (range(1, 22) yields 21 ints). Closing parenthesis ends the multi-line list. Result has length 2 + 3 + 21 = 26.

21df = pd.read_csv(csv_path, sep=r"\s+", header=None, names=cols)

Same CSV-loading recipe as fit_full_pipeline: regex whitespace separator, no header row, explicit names. Yields a DataFrame with 26 named columns.

EXECUTION STATE
📚 pd.read_csv() = Pandas CSV/whitespace parser. See the train code annotation for arg-by-arg breakdown.
⬆ result: df = DataFrame, ~53,759 × 26 for the FD002 train file.
22df["RUL_capped"] = np.minimum(

Start of a multi-line np.minimum call. Computes per-engine raw RUL and caps it at R_MAX in a single expression — no intermediate RUL_raw column.

EXECUTION STATE
📚 np.minimum(a, b) = Element-wise min. Here a is the raw RUL Series (broadcast against scalar b=R_MAX). See lines 23–25 for the args.
23df.groupby("engine_id")["cycle"].transform("max") - df["cycle"]

First arg of np.minimum: the raw RUL Series. groupby + transform('max') computes each engine's max cycle and broadcasts it back to original row count, so we can subtract row-by-row.

EXECUTION STATE
📚 .groupby(col).transform(fn) = Per-group aggregation that BROADCASTS back to original length. Unlike .agg() which collapses to one row per group.
→ example = Engine 1 has 192 cycles, max cycle = 192. transform fills 192 in all 192 of engine 1's rows. Subtracting cycle [1, 2, ..., 192] gives raw RUL [191, 190, ..., 0].
⬇ arg 1 to np.minimum = Series of raw RUL: [191, 190, ..., 0, 286, ..., 0, ...] across all engines.
24R_MAX,

Second arg: scalar 125. Broadcast against the raw RUL Series, every element compared against 125.

EXECUTION STATE
⬇ arg 2 to np.minimum = R_MAX = 125 (scalar). NumPy broadcasts to match the Series shape.
25)

Closing paren of np.minimum. The result is the capped RUL Series, assigned to df['RUL_capped']. Values: 125 plateau for healthy region, then linear decline to 0 in the last 125 cycles.

EXECUTION STATE
⬆ df["RUL_capped"] = Series, length len(df). e.g. for engine 1: [125, 125, ..., 125, 124, 123, ..., 0].
27# Condition assignment using the FITTED km from train

Comment marking the start of the condition-tagging block. Critical discipline: we LOAD the fitted clusterer; we never re-fit on test data (would leak test info into the cluster centroids).

28if self.km is None:

Branch on whether a clusterer was saved. None → single-condition (FD001/FD003); not None → multi-condition (FD002/FD004).

29df["cond"] = 0

Single-condition path. Assign scalar 0; pandas broadcasts to a column of zeros.

30else:

Multi-condition path. We have a fitted clusterer.

31df["cond"] = self.km.predict(df[["op_set_1", "op_set_2", "op_set_3"]])

Use the FITTED clusterer to assign cluster IDs to the new data. NEVER call .fit() here — that would compute new centroids and the means/stds in the bundle would be misaligned.

EXECUTION STATE
📚 KMeans.predict(X) = Assigns each row of X to the nearest centroid. Returns an int array of cluster IDs in {0, ..., k-1}. O(n × k × d) time. PURE prediction — does NOT modify the estimator.
⬇ arg: df[["op_set_1", "op_set_2", "op_set_3"]] = DataFrame slice → ~53,759 × 3 array. Each row is one cycle's (altitude, Mach, throttle).
⬆ result: df["cond"] = ndarray length len(df), dtype int32, values ∈ {0, 1, 2, 3, 4, 5}.
→ why predict not fit_predict? = fit_predict refits AND assigns. Would overwrite the train-fit centroids and silently leak test info. Use predict EVERY time after the first fit.
33# Health state from capped RUL

Comment for the §7.3 discretisation block: convert the continuous capped-RUL signal into a 3-class health-state label.

34state = np.zeros(len(df), dtype=np.int64)

Allocate the health-state array, initialised to zeros (class 0 = NORMAL). We will overwrite the rows where RUL drops below thresholds.

EXECUTION STATE
📚 np.zeros(shape, dtype) = Allocate a NumPy array of given shape filled with 0. Memory-allocates a contiguous buffer.
⬇ arg 1: shape = len(df) = Length of the DataFrame, e.g. 53,759. 1-D output.
⬇ arg 2: dtype = np.int64 = 64-bit signed integer. PyTorch's CrossEntropyLoss requires int64 (long) class indices, so we match that here to avoid a later cast.
⬆ state = ndarray, shape (53759,), dtype int64, all zeros.
35state[df["RUL_capped"] <= THR_DEGR] = 1

Boolean-mask assignment. The expression on the left builds a length-N boolean mask; assigning 1 sets every True position to 1 (DEGRADING).

EXECUTION STATE
df["RUL_capped"] <= THR_DEGR = Element-wise comparison Series → boolean Series. True where RUL ≤ 80.
→ example mask = [False, False, ..., True, True, True, True] — flips True for the 80-cycle window before failure.
state[mask] = 1 = Boolean fancy indexing. Every True position set to 1. False positions stay 0.
36state[df["RUL_capped"] <= THR_CRIT] = 2

Same pattern, threshold 30, value 2 (CRITICAL). Order matters: this runs AFTER the THR_DEGR assignment so the last 30 cycles get OVERWRITTEN from 1 → 2. Final encoding: 0=normal, 1=degrading, 2=critical.

EXECUTION STATE
→ why ≤ 30 overrides ≤ 80? = Every row where RUL ≤ 30 also has RUL ≤ 80 (so it was set to 1 first). The second assignment overwrites those rows to 2. Net effect: 0/1/2 partition.
⬆ state distribution = Most rows = 0 (engines spend most life NORMAL). Last 80 cycles per engine = 1 (DEGRADING). Last 30 = 2 (CRITICAL).
38self.window = window

Stash the window length on self so __getitem__ can use it.

EXECUTION STATE
self.window = 30 — sliding-window length in cycles.
39self.samples, self.engines = [], {}

Tuple-unpack assignment. self.samples will collect (engine_id, end_cycle) tuples — one per valid sliding window. self.engines maps engine_id → 4-tuple of pre-extracted arrays.

EXECUTION STATE
self.samples = list[tuple[int, int]]. After the loop: ~53,759 - 100 × W = ~50,000 entries (one per valid window).
self.engines = dict[int, tuple]. Keys 1..100 (FD002 has 100 train engines). Values: (sensors, cond, rul, hs).
40for eid, sub in df.groupby("engine_id"):

Iterate engine-by-engine. groupby returns (group_key, sub_DataFrame) pairs. sub is a DataFrame of all rows belonging to engine eid, in cycle order.

LOOP TRACE · 4 iterations
eid=1
sub = DataFrame, ~192 rows × 26 cols, all cycles for engine 1
eid=2
sub = DataFrame, ~287 rows for engine 2 (engines have different lifetimes)
eid=3
sub = DataFrame for engine 3
...
... = Continues for engines 4 through 100 (FD002 has 100 train engines). Each engine's sub gets its own 4-tuple in self.engines and contributes (len(sub) - W + 1) windows.
41sensors = sub[sensor_cols].to_numpy(dtype=np.float32)

Extract the 14 informative sensor columns as a float32 NumPy matrix. float32 chosen because models train in float32 and we want zero-copy via torch.from_numpy.

EXECUTION STATE
sub[sensor_cols] = DataFrame slice → 14 columns, ~N_e rows. Order matches the means/stds columns.
📚 .to_numpy(dtype=...) = Pandas method: returns the underlying ndarray, optionally casting dtype. Cheap for already-numeric columns.
⬆ sensors = ndarray (N_e, 14), dtype float32. e.g. (192, 14) for engine 1.
42cond = sub["cond"].to_numpy(dtype=np.int64)

Per-cycle cluster IDs as int64. dtype matters because we will use these as INDICES into self.means / self.stds in __getitem__, and PyTorch's advanced indexing requires int64 (long).

EXECUTION STATE
⬆ cond = ndarray (N_e,), dtype int64. e.g. [3, 3, 5, 5, 1, 0, 0, 2, ...] — engine cycles through different operating regimes.
43rul = sub["RUL_capped"].to_numpy(dtype=np.float32)

Capped RUL as float32 (regression target). MSE/Huber losses operate in float; matches the model's output dtype.

EXECUTION STATE
⬆ rul = ndarray (N_e,), dtype float32. Plateau at 125 then linear decline. e.g. [125, 125, ..., 125, 124, ..., 0] for engine 1.
44hs = state[sub.index]

Pull this engine's health-state values out of the global `state` array. sub.index gives the original row positions of these rows in df, so state[sub.index] returns them in the same cycle order.

EXECUTION STATE
sub.index = Pandas Int64Index — the row labels (= positional indices because we never reset_index'd). e.g. for engine 2: [192, 193, ..., 478].
state[sub.index] = Fancy indexing: returns elements at those positions, length N_e.
⬆ hs = ndarray (N_e,), dtype int64. e.g. [0, 0, ..., 0, 1, 1, ..., 1, 2, 2, ..., 2] — normal/degrading/critical pattern.
45self.engines[eid] = (sensors, cond, rul, hs)

Cache the 4-tuple per engine. __getitem__ does NOT re-slice the DataFrame — it just dict-looks-up self.engines[eid] and slices the cached arrays. This makes per-sample retrieval O(W) instead of O(N_e).

EXECUTION STATE
tuple ordering convention = (sensors, cond, rul, hs) — same order across train/test/dataset code. Important: __getitem__ unpacks in this exact order.
46for end in range(window, len(sub) + 1):

Generate every valid sliding-window endpoint. The window covers cycles [end - W, end), so end ranges from W (first complete window) up to len(sub) inclusive (last window ends at the failure cycle).

EXECUTION STATE
📚 range(start, stop) = Yields integers start, start+1, ..., stop-1.
→ example for engine 1 (192 cycles, W=30) = range(30, 193) yields 30, 31, ..., 192 — 163 valid windows for this engine.
→ why len(sub) + 1? = We want end to take the value len(sub) so the slice [end-W : end] = [len(sub)-W : len(sub)] reaches the last cycle. range stop is exclusive, hence +1.
LOOP TRACE · 5 iterations
end = 30
window covers cycles = [0, 30) — first 30 cycles, target = RUL at cycle 29
end = 31
window covers cycles = [1, 31) — slid forward by one
end = 32
window covers cycles = [2, 32)
...
... = Each step advances end by 1 until end = len(sub). For engine 1 with 192 cycles → 163 iterations.
end = len(sub)
window covers cycles = [len(sub)-30, len(sub)) — final window, ends at failure. RUL = 0, health-state = CRITICAL.
47self.samples.append((eid, end))

Record this (engine_id, end_cycle) as one DataLoader sample. __getitem__(idx) will look up samples[idx] to recover (eid, end) and slice the cached arrays accordingly.

EXECUTION STATE
📚 list.append(x) = In-place: appends x to the end of the list in O(1) amortised.
⬆ self.samples grows = After the inner loop finishes for engine 1 (163 windows), self.samples has 163 entries. After all 100 engines: ~50,000 entries.
49def __len__(self): return len(self.samples)

PyTorch DataLoader contract method. Defines how many samples the dataset has. Single-line because nothing else is needed — len() just delegates to the samples list.

EXECUTION STATE
⬆ returns = int — total number of (engine, window) pairs across all engines. ~50,000 for FD002 train.
51def __getitem__(self, idx) → tuple of 4 tensors

PyTorch DataLoader contract method. Given an integer index, return ONE training sample. Called by DataLoader's worker processes — should be lightweight (no heavy I/O).

EXECUTION STATE
⬇ input: self = The Dataset instance. Provides .samples, .engines, .window, .means, .stds.
⬇ input: idx = Integer in [0, len(self)). DataLoader chooses these (shuffled or sequential).
⬆ returns = tuple of 4 tensors: (x_norm, c_seq, y_rul, y_hs) — described in the return-line annotation.
52eid, end = self.samples[idx]

Look up which (engine, window endpoint) this idx maps to. O(1) list indexing + tuple unpacking.

EXECUTION STATE
→ example: idx = 0 = self.samples[0] = (1, 30) for engine 1's first window. eid=1, end=30.
53sensors, cond, rul, hs = self.engines[eid]

Dict-lookup that engine's 4-tuple of cached arrays. Tuple-unpack into the 4 names. No copy — these are still ndarray references.

EXECUTION STATE
self.engines[eid] = (sensors_ndarray, cond_ndarray, rul_ndarray, hs_ndarray) — pre-computed in __init__.
54s, e = end - self.window, end

Compute the window start and end. self.window=30, so for end=30 → s=0, e=30 → covers cycles [0, 30).

EXECUTION STATE
→ example = end = 30, window = 30 → s = 0, e = 30. Window slice covers cycles 0 through 29 (30 cycles).
56# Apply per-condition Z-score

Comment marking the §6.3 / §6.4 normalisation block. The 4 lines below produce the Z-scored sensor window using the per-cycle's condition-specific (μ, σ).

57x_raw = torch.from_numpy(sensors[s:e])

Slice the engine's full sensor matrix to just this window of W cycles, then bridge to a Tensor (zero-copy view of the same memory).

EXECUTION STATE
sensors[s:e] = ndarray slice → shape (30, 14), dtype float32. View, not copy.
📚 torch.from_numpy(arr) = Bridge: shares memory with arr. Same dtype, same shape. Becomes a Tensor we can do PyTorch ops on.
⬆ x_raw = torch.Tensor shape (30, 14), dtype float32 — raw sensor readings for the window.
58c_seq = torch.from_numpy(cond[s:e])

Same pattern for the condition sequence. Shape (30,), int64 — one cluster ID per cycle in the window.

EXECUTION STATE
⬆ c_seq = torch.Tensor shape (30,), dtype int64. e.g. tensor([3, 3, 3, 5, 5, 1, 0, 0, 2, 2, ...]) — engine drifted through regimes 3→5→1→0→2 in this window.
59mu = self.means[c_seq]

PyTorch ADVANCED INDEXING. self.means is (6, 14); c_seq is (30,) of int64. The result picks rows: for each of the 30 cluster IDs, fetch the corresponding row of self.means → shape (30, 14).

EXECUTION STATE
📚 tensor[index_tensor] = Advanced indexing: when index is a LongTensor, it gathers rows. self.means[c_seq] = stack of self.means[c_seq[i]] for each i.
→ example = If c_seq[0] = 3 → mu[0] = self.means[3] (the 14-element row of regime-3 means). Repeated for 30 cycles → (30, 14).
⬆ mu = torch.Tensor shape (30, 14), dtype float32. Per-cycle mean — different cycle, different row of stats depending on its regime.
60sigma = self.stds [c_seq]

Same advanced indexing on self.stds. Per-cycle σ matched to the same regime sequence. The space before [c_seq] is a stylistic alignment, not Python syntax.

EXECUTION STATE
⬆ sigma = torch.Tensor shape (30, 14), dtype float32 — per-cycle stds.
61x_norm = (x_raw - mu) / (sigma + 1e-8)

Per-condition Z-score, computed cycle-by-cycle. Each cycle of x_raw is normalised by its OWN regime's μ and σ. Adding 1e-8 to sigma is the standard numerical-stability trick (prevents divide-by-zero if some sensor is constant within a regime).

EXECUTION STATE
(x_raw - mu) = Element-wise subtraction. Both (30, 14). Centres the data on each cycle's regime mean.
(sigma + 1e-8) = Element-wise add of a tiny constant. Numerical safety: never divide by exactly 0.
→ why per-condition Z-score? = A 'high' temperature reading at cruise altitude is normal but at takeoff it's anomalous. Per-regime normalisation removes the regime offset/scale so the model can focus on degradation, not flight conditions.
⬆ x_norm = torch.Tensor shape (30, 14), dtype float32 — Z-scored window. ~Mean 0, std ~1 PER-REGIME-PER-SENSOR.
63return (

Begin the 4-element return tuple. Lines 64–67 are its components. Returning a tuple lets DataLoader's default collate stack each element into a batch tensor independently.

64x_norm,

Element 1: the normalised input window. Shape (30, 14), float32. After collate by DataLoader: (B, 30, 14).

EXECUTION STATE
⬆ x_norm = torch.Size([30, 14]), float32 — the model's INPUT.
65c_seq,

Element 2: the condition sequence. Used by the per-condition normaliser HEAD (not main loss). Shape (30,), int64.

EXECUTION STATE
⬆ c_seq = torch.Size([30]), int64 — auxiliary input.
66torch.tensor(float(rul[end - 1])),

Element 3: the regression target — capped RUL at the LAST cycle of the window. We wrap it as a 0-dim tensor (scalar) so DataLoader can stack across the batch into a (B,) tensor.

EXECUTION STATE
rul[end - 1] = ndarray scalar — capped RUL at cycle (end - 1). For end=30, that's cycle 29 (the last in this window).
📚 float(x) = Built-in: Python scalar conversion. Strips NumPy dtype for explicit clarity.
📚 torch.tensor(value) = Constructs a Tensor from a Python scalar/list. Default dtype follows the value: float → float32 (with the float() above making it explicit).
→ why scalar at end-1? = Sequence-to-one regression: read 30 cycles, predict ONE value (RUL at the most recent cycle). Future chapters explore sequence-to-sequence variants.
⬆ y_rul = torch.Size([]) (0-dim), float32 — scalar capped RUL.
67torch.tensor(int(hs[end - 1])),

Element 4: the classification target — health state at the last window cycle. int() forces Python int → torch.tensor produces dtype int64 (long), which CrossEntropyLoss requires.

EXECUTION STATE
hs[end - 1] = Class index ∈ {0, 1, 2} for the last cycle in the window.
📚 int(x) = Built-in: cast to Python int. Forces torch.tensor to use int64 (otherwise NumPy int types might flow through).
→ CrossEntropyLoss requirement = PyTorch's nn.CrossEntropyLoss expects target shape (B,) of dtype int64. Float or int32 raises a runtime error.
⬆ y_hs = torch.Size([]) (0-dim), int64 — scalar health-state class.
68)

Closing paren of the return tuple. Final 4-tuple shape summary: (x_norm: (30, 14) float32, c_seq: (30,) int64, y_rul: () float32, y_hs: () int64).

71# Use it

Comment marking the demo block — instantiate the Dataset and pull a sample to verify shapes/dtypes.

72ds = CMAPSSFullDataset(

Open the constructor call. Multi-line because we want kwargs visible. Triggers __init__: load bundle → load CSV → derive RUL/cond/state → build samples + engines.

73csv_path="data/raw/train_FD002.txt",

First kwarg: the data file. Here we re-use the train file just to demo the Dataset; in real training you'd point it at train_FD002.txt; for evaluation, test_FD002.txt + RUL_FD002.txt.

74bundle_path="fd002_full_pipeline.joblib",

Second kwarg: the bundle path. The .joblib file produced by fit_full_pipeline above. Mismatched bundles silently misalign means/stds — keep the naming convention strict.

75window=30,

Third kwarg: the sliding-window length. 30 cycles is the standard choice across this book and most C-MAPSS literature.

76)

Closing paren of the constructor call. After this, ds is a fully-initialised CMAPSSFullDataset with ~50,000 samples ready for DataLoader.

EXECUTION STATE
⬆ ds = CMAPSSFullDataset instance. len(ds) ≈ 50,000. ds[0] returns the 4-tuple.
78X, c, y_rul, y_hs = ds[0]

Tuple-unpack the first sample. ds[0] calls __getitem__(0), which looks up samples[0] = (eid=1, end=30) and slices/normalises that window.

EXECUTION STATE
X = torch.Tensor (30, 14), float32 — engine 1's first window, Z-scored.
c = torch.Tensor (30,), int64 — cluster IDs for those 30 cycles.
y_rul = torch.Tensor (), float32 — capped RUL at engine 1, cycle 29 = 125 (still healthy).
y_hs = torch.Tensor (), int64 — health state at cycle 29 = 0 (NORMAL).
79print("X.shape :", tuple(X.shape))

Print the input shape. tuple(X.shape) coerces torch.Size into a plain Python tuple for cleaner printing.

EXECUTION STATE
📚 tuple(t.shape) = torch.Size is a subclass of tuple but prints as 'torch.Size([30, 14])'. tuple() unwraps it to (30, 14).
⬆ stdout = X.shape : (30, 14)
80print("c.shape :", tuple(c.shape))

Verify the condition sequence is 1-D of length W=30.

EXECUTION STATE
⬆ stdout = c.shape : (30,)
81print("y_rul.dtype:", y_rul.dtype)

Confirm the RUL target is float32 — the dtype the regression loss (MSE / Huber) needs.

EXECUTION STATE
⬆ stdout = y_rul.dtype: torch.float32
82print("y_hs.dtype :", y_hs.dtype)

Confirm the health-state target is int64 — the dtype CrossEntropyLoss requires for class indices.

EXECUTION STATE
⬆ stdout = y_hs.dtype : torch.int64
15 lines without explanation
1import joblib, numpy as np, pandas as pd, torch
2from torch.utils.data import Dataset
3
4R_MAX, THR_DEGR, THR_CRIT = 125, 80, 30
5
6
7class CMAPSSFullDataset(Dataset):
8    """The production Dataset that integrates feature selection,
9    condition discovery, RUL cap, health labels, and sliding windows."""
10
11    def __init__(self, csv_path: str, bundle_path: str, window: int = 30):
12        bundle = joblib.load(bundle_path)
13        self.means       = torch.from_numpy(bundle["means"]).float()
14        self.stds        = torch.from_numpy(bundle["stds"]).float()
15        self.km          = bundle["km"]
16        sensor_cols      = bundle["sensor_cols"]
17
18        cols = (["engine_id", "cycle"]
19                + [f"op_set_{i}" for i in range(1, 4)]
20                + [f"sensor_{i}" for i in range(1, 22)])
21        df = pd.read_csv(csv_path, sep=r"\s+", header=None, names=cols)
22        df["RUL_capped"] = np.minimum(
23            df.groupby("engine_id")["cycle"].transform("max") - df["cycle"],
24            R_MAX,
25        )
26
27        # Condition assignment using the FITTED km from train
28        if self.km is None:
29            df["cond"] = 0
30        else:
31            df["cond"] = self.km.predict(df[["op_set_1", "op_set_2", "op_set_3"]])
32
33        # Health state from capped RUL
34        state = np.zeros(len(df), dtype=np.int64)
35        state[df["RUL_capped"] <= THR_DEGR] = 1
36        state[df["RUL_capped"] <= THR_CRIT] = 2
37
38        self.window = window
39        self.samples, self.engines = [], {}
40        for eid, sub in df.groupby("engine_id"):
41            sensors = sub[sensor_cols].to_numpy(dtype=np.float32)
42            cond    = sub["cond"].to_numpy(dtype=np.int64)
43            rul     = sub["RUL_capped"].to_numpy(dtype=np.float32)
44            hs      = state[sub.index]
45            self.engines[eid] = (sensors, cond, rul, hs)
46            for end in range(window, len(sub) + 1):
47                self.samples.append((eid, end))
48
49    def __len__(self): return len(self.samples)
50
51    def __getitem__(self, idx):
52        eid, end = self.samples[idx]
53        sensors, cond, rul, hs = self.engines[eid]
54        s, e = end - self.window, end
55
56        # Apply per-condition Z-score
57        x_raw  = torch.from_numpy(sensors[s:e])           # (W, 14)
58        c_seq  = torch.from_numpy(cond[s:e])              # (W,)
59        mu     = self.means[c_seq]                         # (W, 14)
60        sigma  = self.stds [c_seq]                         # (W, 14)
61        x_norm = (x_raw - mu) / (sigma + 1e-8)
62
63        return (
64            x_norm,                                 # (W, 14) normalised input
65            c_seq,                                   # (W,) condition labels
66            torch.tensor(float(rul[end - 1])),       # () scalar capped RUL
67            torch.tensor(int(hs[end - 1])),          # () scalar health state (0/1/2)
68        )
69
70
71# Use it
72ds = CMAPSSFullDataset(
73    csv_path="data/raw/train_FD002.txt",
74    bundle_path="fd002_full_pipeline.joblib",
75    window=30,
76)
77
78X, c, y_rul, y_hs = ds[0]
79print("X.shape    :", tuple(X.shape))     # (30, 14)
80print("c.shape    :", tuple(c.shape))      # (30,)
81print("y_rul.dtype:", y_rul.dtype)         # torch.float32
82print("y_hs.dtype :", y_hs.dtype)          # torch.int64

The DataLoader on Top

DataLoader configuration is two lines once the Dataset is in place. Three production knobs matter:

ArgumentValue (this book)Why
batch_size256Standard for 3.5M-param models on a single 16 GB GPU
shuffleTrue (train), False (test/eval)SGD needs random order in train; eval needs deterministic order
num_workers4Parallel data-loading threads. 4 keeps the GPU fed without overwhelming the CPU
pin_memoryTruePinned host memory makes .to('cuda', non_blocking=True) faster
drop_lastFalse (train), False (eval)We don't drop last batch - C-MAPSS batches always divide cleanly enough
persistent_workersTrueAvoids re-spawning workers between epochs - shaves seconds per epoch
Every model in Parts V-VII trains with the SAME DataLoader config. Only the loss function (AMNL / GABA / GRACE) varies. Chapter 15 walks through the unified training loop end to end.

Pipeline Patterns Across ML

DomainEquivalent of CMAPSSFullDatasetBundle persistence
RUL (this book)CMAPSSFullDatasetjoblib (sklearn + means/stds)
NLPHuggingFace tokenizer + DataCollatortokenizer.json
Visiontorchvision Dataset + transformsPillow + PyTorch state_dict
SpeechKaldi feature extractor.ark / .scp pairs
RecommenderPer-user / per-item embedding lookupsFaiss index + embedding tensors

Three Pipeline Pitfalls

Pitfall 1: Using TEST means/stds. Section 6.4 warned about this. The bundle is fit on TRAIN only. Re-fitting on test silently leaks; the test pipeline must only LOAD, never FIT.
Pitfall 2: Window size mismatch between train and test. Always use the same W = 30 for both. Test engines truncated to fewer than W cycles get padding (or are skipped) - track this explicitly.
Pitfall 3: Forgetting to set num_workers=0 when debugging. Multi-worker DataLoaders hide print statements and obscure stack traces. Set num_workers=0 while debugging an exception inside __getitem__.
The point. One Dataset class wraps the entire data path. Train pipeline fits, persists; test pipeline loads, applies. Every model in Parts V-VII reads from this same contract.

Takeaway

  • Per-sample output is a 4-tuple. (X_norm, cond, y_rul, y_hs). Three of those four feed model heads; cond feeds the normaliser only.
  • Train fits the bundle once. joblib serialises k-means + per-cond means / stds + sensor_cols.
  • Test loads the bundle, never refits. Section 6.4 discipline.
  • DataLoader config is uniform across the book. Batch 256, shuffle on train, 4 workers, pin_memory, persistent workers. Same config for AMNL / GABA / GRACE.
Loading comments...