Chapter 6
10 min read
Section 26 of 121

Preventing Test-Set Leakage

Per-Condition Normalization

The Doping Stadium

An athlete who reads the test list before competing has an unfair advantage. Even if his actual race was honest, the knowledge that a particular drug is being screened changes how he prepares. That is leakage in its purest form: information from the evaluation set polluting the training process.

In ML pipelines the equivalent is computing statistics — means, stds, scaling factors, k-means centroids — from data that will be evaluated on. The model trains; it tests well. Then you deploy and watch the metrics collapse, because reality does not ship pre-fitted training statistics.

The discipline. Every statistic, every fit, every threshold computed during training MUST come from training data only. Persist the artefacts; reload them at test time without re-fitting.

What Counts as Test-Set Leakage Here

Pipeline stepLeakage versionCorrect version
k-means cluster discoveryfit_predict on train+test combinedfit on train, predict on test
Per-condition mean/stdCompute on full datasetCompute on train slice only
Feature selectionSelect using test correlations with RULSelect using train data; apply to test
Hyperparameter tuningPick lambda from test lossPick from a held-out validation slice
RUL capSet R_max from test populationSet R_max from train (R_max=125 is conventional)
Even “harmless” statistics like the population mean leak. If you compute μ=mean of train + test\mu = \text{mean of train + test} and Z-score with that, the model implicitly knows what test data looks like. The fix is mechanical: always fit on train only.

The Train-Then-Apply Discipline

A clean pipeline has two distinct phases. Phase 1 reads only train data; computes all statistics; persists them. Phase 2 reads test data and the persisted statistics; never goes back to train. Encoding this as code-level separation (different functions, different files, different processes) is the only way to make it survive code review and team rotation.

Python: One Bundle, Two Apply Calls

Train fits the bundle; test loads it; never the reverse
🐍leakage_safe_pipeline.py
1import joblib

Persistence.

2import numpy as np

ndarray.

3import pandas as pd

Loader.

4from sklearn.cluster import KMeans

Clusterer.

6OP_COLS = [f'op_set_{i}' for i in range(1, 4)]

Three op-setting columns.

9def load_cmapss(path):

Reusable loader that returns a DataFrame with the standard column layout.

10cols = (["engine_id", "cycle"] + OP_COLS + [f"sensor_{i}" for i in range(1, 22)])

26-column layout.

11df = pd.read_csv(path, sep=r"\s+", header=None, names=cols)

Whitespace-separated, no header.

12return df

DataFrame.

16df_train = load_cmapss("data/raw/train_FD002.txt")

Load TRAIN only. Test data does NOT enter this scope.

17km = KMeans(n_clusters=6, n_init=10, random_state=0).fit(df_train[OP_COLS])

Fit on train op-settings. Locked seed for reproducibility.

19df_train = df_train.assign(cond=km.labels_)

.assign returns a new DataFrame with the extra column - immutable-ish style.

20sensor_cols = [f"sensor_{i}" for i in range(1, 22)]

Sensor names.

21means = np.stack([df_train.loc[df_train['cond'] == c, sensor_cols].mean().to_numpy() for c in range(6)])

(6, 21) per-condition means computed ONLY from training data.

EXECUTION STATE
means.shape = (6, 21)
23stds = ... .std() ...

(6, 21) per-condition stds.

27joblib.dump({"km": km, "means": means, "stds": stds}, "fd002_train_stats.joblib")

Persist EVERYTHING the test pipeline needs. From this moment, the test code never re-touches df_train.

EXECUTION STATE
→ discipline = If your test code accesses df_train at any point, you have leaked.
31bundle = joblib.load("fd002_train_stats.joblib")

Load. This is the ONLY file the test path reads from training.

32df_test = load_cmapss("data/raw/test_FD002.txt")

Test data loaded fresh.

35test_labels = bundle["km"].predict(df_test[OP_COLS])

.predict (NOT .fit_predict). Uses the train-fitted centroids to assign each test row to one of the 6 clusters. Cluster IDs are consistent with training.

EXECUTION STATE
→ if you re-fit = Cluster IDs would shuffle - means[3] would no longer correspond to 'condition 3' as known by the train-time pipeline.
38test_sensors = df_test[sensor_cols].to_numpy(dtype=np.float32)

Materialise the (N_test, 21) sensor matrix.

39test_norm = (test_sensors - bundle['means'][test_labels]) / (bundle['stds'][test_labels] + 1e-8)

Apply per-condition Z-score using TRAIN means/stds. Same advanced-indexing trick from §6.3.

EXECUTION STATE
bundle['means'][test_labels] = Shape (N_test, 21) - one mean vector per test row, looked up by its assigned cluster
41print("test rows normalised:", test_norm.shape)

Sanity check.

EXECUTION STATE
Output = test rows normalised: (33991, 21)
42print("test mean ~ 0? :", round(test_norm.mean(), 4))

Should be near zero if train and test come from similar distributions. A small non-zero mean (~0.01) is expected because test distribution can drift slightly.

EXECUTION STATE
Output = test mean ~ 0? : 0.01
43print("test std ~ 1? :", round(test_norm.std(), 4))

Should be near 1.0. Small drift expected.

EXECUTION STATE
Output = test std ~ 1? : 1.04
→ diagnostic value = If std is wildly off (say, 5.0), check whether you accidentally normalised with the WRONG bundle (e.g., FD001 stats applied to FD002 test).
23 lines without explanation
1import joblib
2import numpy as np
3import pandas as pd
4from sklearn.cluster import KMeans
5
6OP_COLS = [f"op_set_{i}" for i in range(1, 4)]
7
8
9def load_cmapss(path: str) -> pd.DataFrame:
10    cols = (["engine_id", "cycle"] + OP_COLS + [f"sensor_{i}" for i in range(1, 22)])
11    df = pd.read_csv(path, sep=r"\s+", header=None, names=cols)
12    return df
13
14
15# ----- Step 1: Fit on TRAIN only -----
16df_train = load_cmapss("data/raw/train_FD002.txt")
17km = KMeans(n_clusters=6, n_init=10, random_state=0).fit(df_train[OP_COLS])
18
19df_train = df_train.assign(cond=km.labels_)
20sensor_cols = [f"sensor_{i}" for i in range(1, 22)]
21means = np.stack([df_train.loc[df_train["cond"] == c, sensor_cols].mean().to_numpy()
22                  for c in range(6)])
23stds  = np.stack([df_train.loc[df_train["cond"] == c, sensor_cols].std().to_numpy()
24                  for c in range(6)])
25
26# Save bundle - the ONLY object the test pipeline reads
27joblib.dump({"km": km, "means": means, "stds": stds}, "fd002_train_stats.joblib")
28
29
30# ----- Step 2: Apply at test time -----
31bundle = joblib.load("fd002_train_stats.joblib")
32df_test = load_cmapss("data/raw/test_FD002.txt")
33
34# Use the FITTED km - never re-fit
35test_labels = bundle["km"].predict(df_test[OP_COLS])
36
37# Apply train-time means/stds via advanced indexing
38test_sensors = df_test[sensor_cols].to_numpy(dtype=np.float32)
39test_norm = (test_sensors - bundle["means"][test_labels]) / (bundle["stds"][test_labels] + 1e-8)
40
41print("test rows normalised:", test_norm.shape)
42print("test mean ~ 0?       :", round(test_norm.mean(), 4))
43print("test std  ~ 1?       :", round(test_norm.std(),  4))
44
45# test rows normalised: (33991, 21)
46# test mean ~ 0?       : ~0.01      (small offset because test distribution drifts slightly from train)
47# test std  ~ 1?       : ~1.04
The diagnostic. Compute test mean and std after normalisation. They should be near zero and near one if train and test come from the same distribution. Big deviations (mean > 0.2, std > 1.5) signal either bundle mismatch or genuine distribution shift.

PyTorch: state_dict Carries the Statistics

For PyTorch deployment, a single Module wraps the entire pipeline - including the per-condition normaliser's buffers and the downstream model's parameters. state_dict captures BOTH; load_state_dict restores everything.

ConditionGate + PerConditionNormaliser + model in one save/load
🐍full_pipeline.py
1import torch

Top-level PyTorch.

2import torch.nn as nn

Modules.

3import joblib

Persistence.

5class FullPipeline(nn.Module):

End-to-end pipeline that wraps the clusterer + normaliser + downstream model. Single .forward call takes raw inputs and returns predictions. The clean train/test contract.

8def __init__(self, joblib_path, model):

Constructor takes the saved bundle path and the downstream model.

EXECUTION STATE
input: joblib_path = Path to the train-time bundle (km + means + stds)
input: model = Downstream nn.Module - the CNN-BiLSTM-Attention from Chapter 11
9super().__init__()

Initialise nn.Module.

10bundle = joblib.load(joblib_path)

Load all three artefacts at once.

12self.register_buffer("means", torch.from_numpy(bundle["means"]).float())

Means buffer.

13self.register_buffer("stds", torch.from_numpy(bundle["stds"]).float())

Stds buffer.

15self.km = bundle["km"]

Sklearn estimator. Stored as a plain attribute - PyTorch leaves it alone.

16self.model = model

The downstream model. Because it is an nn.Module attribute, ITS parameters appear in state_dict and move with .to(device).

EXECUTION STATE
→ state_dict scope = PyTorch automatically discovers nested nn.Modules and merges their state_dicts. Buffers + nested module = single saveable artefact.
18def forward(self, x, op_settings):

Single forward pass. Inputs: (B, T, F) sensor batch + (B, T, 3) op-settings.

20ops_flat = op_settings.reshape(-1, 3).cpu().numpy()

sklearn .predict needs CPU NumPy. Flatten (B, T, 3) -> (B*T, 3); move to CPU; convert.

21cond_flat = self.km.predict(ops_flat)

Cluster assignment per cycle.

22cond = torch.from_numpy(cond_flat).to(x.device).long().reshape(x.shape[:2])

Bring back to the same device as x; cast to int64; reshape to (B, T).

23mu = self.means[cond]

Advanced-indexed gather - same trick as §6.3.

24sigma = self.stds[cond]

Same.

25x_norm = (x - mu) / (sigma + 1e-8)

Per-condition Z-score.

26return self.model(x_norm)

Downstream model consumes the normalised input.

31# torch.save(pipeline.state_dict(), 'pipeline_state.pt')

Production discipline: save state_dict (carries means + stds + downstream model weights) AND keep the joblib bundle (carries the sklearn km). Inference loads BOTH.

EXECUTION STATE
→ why two files? = PyTorch can't pickle the sklearn estimator portably. Two artefacts is the cleanest solution: torch.save for tensors, joblib for the rest.
17 lines without explanation
1import torch
2import torch.nn as nn
3import joblib
4
5class FullPipeline(nn.Module):
6    """ConditionGate + PerConditionNormaliser + downstream model in one."""
7
8    def __init__(self, joblib_path: str, model: nn.Module):
9        super().__init__()
10        bundle = joblib.load(joblib_path)
11        # Normaliser buffers
12        self.register_buffer("means", torch.from_numpy(bundle["means"]).float())
13        self.register_buffer("stds",  torch.from_numpy(bundle["stds"]).float())
14        # Clusterer (sklearn estimator stored as plain attribute)
15        self.km    = bundle["km"]
16        self.model = model        # nn.Module - participates in state_dict
17
18    def forward(self, x: torch.Tensor, op_settings: torch.Tensor) -> torch.Tensor:
19        # x: (B, T, F). op_settings: (B, T, 3).
20        ops_flat = op_settings.reshape(-1, 3).cpu().numpy()
21        cond_flat = self.km.predict(ops_flat)
22        cond = torch.from_numpy(cond_flat).to(x.device).long().reshape(x.shape[:2])
23        mu    = self.means[cond]
24        sigma = self.stds [cond]
25        x_norm = (x - mu) / (sigma + 1e-8)
26        return self.model(x_norm)
27
28
29# ----- Save + load discipline -----
30# state_dict carries means / stds AND the downstream model weights together.
31# joblib.dump captures the sklearn km. Both are needed at inference time.
32
33# pipeline = FullPipeline("fd002_train_stats.joblib", trained_model)
34# torch.save(pipeline.state_dict(), "pipeline_state.pt")
35# Later:
36# pipeline = FullPipeline("fd002_train_stats.joblib", trained_model_skeleton)
37# pipeline.load_state_dict(torch.load("pipeline_state.pt"))
Two artefacts. The sklearn k-means cannot be pickled portably with torch.save (incompatible serialisers). Production pipelines persist TWO files: a .pt for tensor state and a .joblib for the sklearn estimator. Both must be loaded at inference time.

Common Leakage Patterns Across ML

MistakeWhy it leaksFix
Tokeniser fit on full corpusVocabulary contains test-only tokensFit on train only
Image normalisation on full datasetMean/std encode test colour distributionFit on train only
StandardScaler() in a sklearn Pipeline ALSO seeing testSame as aboveUse Pipeline + cross_val_score
Stratified split on label distributionTest labels affect train samplingSplit BEFORE looking at labels
Time-series random splitFuture leaks into pastTime-aware split

Three Subtle Leakage Failures

Pitfall 1: SKlearn Pipelines. Pipeline([('scaler', StandardScaler()), ('km', KMeans()), ('lr', LR())]) followed by .fit_transform(X_full) leaks because all three steps see the entire data. Use .fit(X_train) then .transform(X_test).
Pitfall 2: Cross-validation with stateful preprocessing. Re-fitting the per-condition normaliser inside each CV fold is the right move. Re-using a globally-fitted normaliser is leakage in disguise.
Pitfall 3: The test set drifted. Sometimes test statistics genuinely differ from train. After applying the train-fit normaliser, test mean might be 0.3 and std might be 1.2 - that is a real signal, not a bug. Investigate as distribution shift, not as leakage.
The point. Per-condition normalisation can either be the cleanest preprocessing step in your pipeline or the sneakiest source of false confidence. The difference is one sentence: fit on train, persist, never re-fit on test.

Takeaway

  • Statistics computed on test data leak. Even something as innocent as the global mean.
  • Persist the bundle. joblib for sklearn artefacts; torch.save for PyTorch state_dict. Test loads, never fits.
  • Diagnose with post-normalisation test stats. Mean ~ 0, std ~ 1 means the pipeline is clean. Big deviations mean either bundle mismatch or distribution shift.
  • Two artefacts in production. One torch.save for tensors, one joblib for the sklearn estimator. Load both.
Loading comments...