Chapter 8
20 min read
Section 41 of 117

The Data Pipeline Architecture

Data: The Invisible Foundation

Every public benchmark, every architecture diagram, every loss curve in a modern LLM paper sits on top of one piece of infrastructure that is almost never drawn: the data pipeline. DeepSeek-V3 was pretrained on 14.8 trillion tokens. Those tokens did not exist on disk anywhere in the world before the team built them. They were manufactured — assembled, filtered, deduplicated, scored, and repacked — from several petabytes of raw web crawl, code repositories, and scientific text. This section is the architectural blueprint of that manufacture. What goes in is web sludge; what comes out is the most information-dense corpus a 671B model has ever been fed. Everything that follows in this chapter is about doing each step well; this section is about why the steps exist, what shape they form, and how the shape governs every other engineering choice in pretraining.

The pipeline in one sentence. Take a petabyte of dirty text, run it through eight successive filters that each throw away the majority of what they see, tokenize what survives, shuffle it across thousands of GPUs, and feed it at a constant 3-million-tokens-per-second line rate for 54 days. The architecture is whatever lets that sentence finish without dropping a packet.

The Problem: From Web Sludge to 14.8T Clean Tokens

Start from the naïve question: where do the tokens come from? The public answer — "a snapshot of the internet" — hides the actual engineering problem. Raw CommonCrawl is roughly 400 TB of compressed WARC per snapshot. After HTML extraction it expands to a few petabytes of plain text. Of that, the overwhelming majority is unusable for pretraining: it is the wrong language, it is templated SEO spam, it is bit-identical copies of articles seen a hundred times in earlier shards, it is product listings, it is autogenerated forum boilerplate, it is parked-domain placeholder text, it is malware-injected mirrors. A pretraining corpus is what is left after you remove all of that. The job of the pipeline is to remove it without removing anything useful and without starving the training run.

Three pressures push back on each other and define every design choice:

  1. Quality. A 671B model is unforgiving. Junk tokens do not just produce no signal — they actively burn gradient capacity that could have gone to high-information text. The Chinchilla / DeepSeek scaling laws (Chapter 9) assume tokens are roughly equal in worth, so letting a quarter of your corpus be ad copy effectively shrinks the model.
  2. Volume. Even after aggressive filtering you still need 15×1012\sim 15 \times 10^{12} tokens of survivors. That means you must start with 1015\sim 10^{15} tokens of raw input — petabyte-scale. Per-document Python code that is too slow by 10× turns a 3-day filter pass into a 30-day filter pass.
  3. Throughput. Training never pauses. The pipeline must deliver tokens to 2,048\sim 2{,}048 H800-class GPUs at line rate — about 3 million tokens per second aggregate — with zero stalls. A GPU starved for input is a GPU costing $10/hour to do nothing.

These three pressures form a triangle that pretraining engineers have learned to navigate by separating concerns: an offline stage that prioritizes quality and volume but can take days, and an online stage that prioritizes throughput and runs in lockstep with training. The whole pipeline is the choreography between the two.

The unit of failure. When you read "DeepSeek-V3 was trained on 14.8T tokens", the implicit guarantee is that every one of those tokens passed eight filters and was fed to the model exactly once in a globally-shuffled order, without the training loop ever waiting on the loader for more than a few milliseconds. The pipeline either upholds that guarantee or it does not. Most failures in large-scale training start as a pipeline failure that the model team takes weeks to diagnose as a pipeline failure.

Intuition: A Refinery, Not a Hard Drive

The instinct of someone seeing "14.8T tokens" for the first time is to picture a giant hard drive. That mental model is wrong in a useful way. The right mental model is an oil refinery.

Crude oil arrives at a refinery in vast, dirty volumes. It is fractioned in successive towers — each tower removes one class of impurity and passes the rest forward. Lighter, more valuable products come out the top; sludge comes out the bottom. The refinery's value is not the crude (which is cheap and abundant) and it is not the storage tanks (any company can buy storage). The value is the process: the sequence of towers, the temperatures, the catalysts, the throughput rate. A refinery that produces 1 million barrels a day of clean gasoline is worth more than a refinery that produces 1 million barrels a year of the same gasoline. Throughput is part of the product.

A pretraining data pipeline is the same shape. Raw crawl is the crude. Each stage — language ID, exact dedup, near-dedup, quality classifier — is a tower. The clean tokens at the bottom are the gasoline. And the rate at which the bottom of the funnel delivers tokens to the model is the refinery's output: a number that has to match the model's consumption rate every second of every training day.

The physical picture. Picture a wide tube at the top narrowing to a thin spout at the bottom. The top is fed by a hose at PB scale. Each constriction in the tube is a filter that throws away most of what passes through. Only a few percent of what entered the top emerges at the spout, and the spout's diameter is sized so that it empties into a thousand parallel cups (GPUs) at exactly the rate the cups can drink. Too narrow a spout and the cups go dry; too wide and the filters never finish their job in time.

Why not skip stages and just feed more crude? Two reasons. First, the downstream stages cost more per token. A quality classifier that runs at 1k docs/sec/CPU is fine when fed pre-deduplicated text; it is bankrupting when fed raw crawl because 90% of its work is on documents that exact dedup would have removed for free. The filters are ordered by cost per kept token: cheap, high-throw-away filters first; expensive, surgical filters last. Second, the model's loss responds more to the worst 5% of its training data than to the best 95%. Skipping a stage upgrades the worst 5%, not the average. The economics push you toward thorough filtering, not toward more crude.

The Math: Funnel Retention and Train-Loader Throughput

Two equations capture the entire architectural problem. The first is the funnel itself. Let the pipeline have SS stages, with per-stage retention rate ri(0,1]r_i \in (0, 1] — the fraction of input bytes (or documents, or tokens; we will use bytes for now and convert at the end) that survive stage ii. Let B0B_0 be the raw input in bytes. After stage ii the surviving bytes are:

Bi=B0j=1irjB_i = B_0 \cdot \prod_{j=1}^{i} r_j

And the final survivors in tokens are:

Ntokens=BSbt=B0btj=1SrjN_{\text{tokens}} = \frac{B_S}{b_t} = \frac{B_0}{b_t} \cdot \prod_{j=1}^{S} r_j

where btb_t is the average number of bytes per tokenized unit (about 4 for BPE on English text, lower for compressed languages, higher for code). The product rj\prod r_j is the overall retention rate. For DeepSeek-V3-class pipelines it lands at roughly 0.5%0.5\% to 2%2\%: you need 50–200× more raw bytes than you will keep tokens. The training token budget is the spec; the raw-input budget is the spec multiplied by bt/rjb_t / \prod r_j.

The second equation is the throughput contract. If the model consumes NtokensN_{\text{tokens}} tokens in TdaysT_{\text{days}} days of training across GG GPUs, then the loader must deliver:

τ=NtokensTdays86400\tau = \frac{N_{\text{tokens}}}{T_{\text{days}} \cdot 86400}

tokens per second of aggregate throughput, and per-GPU:

τGPU=τG=NtokensGTdays86400\tau_{\text{GPU}} = \frac{\tau}{G} = \frac{N_{\text{tokens}}}{G \cdot T_{\text{days}} \cdot 86400}

For the DeepSeek-V3 numbers (N=14.8×1012N = 14.8 \times 10^{12}, T=54T = 54, G=2048G = 2048), τ3.17×106\tau \approx 3.17 \times 10^6 tokens/sec aggregate and τGPU1,550\tau_{\text{GPU}} \approx 1{,}550 tokens/sec per GPU. At 4 bytes/token that is roughly 12.7 GB/s12.7~\text{GB/s} aggregate read bandwidth the loader has to sustain — every second, for two months — without ever leaving a GPU waiting.

The two numbers that govern everything. Overall retention rj\prod r_j sizes your offline cluster and your storage bill. Throughput τ\tau sizes your online loader, your shard format, and your data-locality strategy. Every other engineering choice in this chapter follows from one of these two equations.

Manual Numerical Walkthrough

Plug the DeepSeek-V3 numbers into both equations and follow them with pen and paper.

Click to expand: funnel + throughput arithmetic for the 14.8T budget

Step 1 — Choose retention rates per stage. Realistic numbers from the public literature (RefinedWeb, Dolma, DeepSeek tech report):

  • Text extraction (HTML → text): r₁ = 0.50
  • Language ID (English + Chinese + code): r₂ = 0.40
  • URL / heuristic filter: r₃ = 0.50
  • Exact dedup: r₄ = 0.50
  • Near-dedup (MinHash LSH): r₅ = 0.50
  • Quality classifier: r₆ = 0.40
  • PII + safety: r₇ = 0.95
  • Tokenize + pack: r₈ = 1.00 (no further filtering, just encoding)

Step 2 — Product of retention.

∏ rⱼ = 0.50 · 0.40 · 0.50 · 0.50 · 0.50 · 0.40 · 0.95 · 1.00
= 0.0095 ≈ 0.95%

Roughly 1 in 105 raw bytes survives. That sets the raw-input budget.

Step 3 — Required raw input. Target tokens N=14.8×1012N = 14.8 \times 10^{12}; bytes/token bt=4b_t = 4; survivor bytes BS=Nbt=5.92×1013B_S = N \cdot b_t = 5.92 \times 10^{13} ≈ 59.2 TB.

B₀ = B_S / ∏ rⱼ = 5.92 × 10¹³ / 0.0095 ≈ 6.23 × 10¹⁵ bytes ≈ 6.23 PB

You need about 6 PB of raw crawl to manufacture 14.8 TB of training-grade tokens. That is roughly the volume of two full CommonCrawl snapshots plus aggressive code and book corpora. Real teams provision 2–3× that as a safety margin because retention estimates always slip downward in production.

Step 4 — Offline cluster sizing. Assume the slowest stage runs at 1,0001{,}000 documents/sec/CPU core (the quality classifier on a CPU-bound fastText model). With documents averaging ~10 KB, that is 10 MB/s/core. To process 6 PB in 14 calendar days:

cores_needed = 6 × 10¹⁵ B / (10 × 10⁶ B/s/core · 14 · 86400 s)
≈ 4,960 cores ≈ 78 high-core hosts of 64 cores each

Roughly 80 CPU hosts running for two weeks. That is the size of the offline pre-pass — and the bill that frontier labs hide behind phrases like "custom data pipeline".

Step 5 — Online throughput contract. Plug into the second equation:

τ = 14.8 × 10¹² / (54 · 86400) ≈ 3.17 × 10⁶ tokens/sec
τ_GPU = 3.17 × 10⁶ / 2048 ≈ 1,550 tokens/sec/GPU
aggregate read = τ · 4 B ≈ 12.7 GB/s

12.7 GB/s sustained over 54 days is roughly 60 PB of total reads — more than the raw corpus, because the loader cycles through the tokenized shards multiple times during pretraining. A single NVMe SSD sustains ~3 GB/s; a striped NVMe pool of 16 drives or a distributed object store (S3, Ceph, Lustre) is the typical landing spot.

Step 6 — Per-GPU loader budget. 1,550 tokens/sec = 6.2 KB/sec at 4 B/token. Per GPU the bandwidth is trivial. What is not trivial is the latency: every GPU step must have a batch in pinned host memory before the previous step finishes DMA-ing the last one. A 4096-context batch of micro-batch 2 is 65 KB; the DataLoader must produce it in less than the GPU's single training-step time, which on a 671B MoE is roughly 200200 ms. So each CPU worker has roughly 200 ms of slack — long compared to memory copies, very short compared to a Python dictionary lookup or a TCP round-trip.

Step 7 — Sanity check against a tighter pipeline. Tighten dedup to r₅ = 0.35 and quality to r₆ = 0.50. ∏ rⱼ becomes 0.50 · 0.40 · 0.50 · 0.50 · 0.35 · 0.50 · 0.95 ≈ 0.0083 — slightly worse. The required raw input grows to 7.1 PB, but the kept tokens are higher quality. The pipeline designer chooses where on this quality-vs-volume frontier the project lands; the math gives them the price of every choice.

Visualizing the Pipeline

Each slice of the funnel below is one stage. The slice's left edge is the bytes entering the stage; its right edge is the bytes surviving. Drag the retention sliders on the right and watch the funnel collapse in real time. The output meter at the top right turns green when the surviving tokens cross the 14.8T target and red when they fall short.

Loading data pipeline visualizer…

Three things to try. First, click the "Lazy pipeline" preset. Retention is much higher per stage, the funnel barely narrows, you exceed 14.8T tokens easily — but the "tokens" you keep are saturated with templated SEO and near-duplicates. This is the trap of skipping stages: the volume is fine, the quality is not. Second, switch to the DeepSeek-V3 default and drag the near-dedup slider to 0.20. Watch the output meter dip below target — you would now need to feed 9 PB of raw input to recover. Aggressive filtering is cheap in compute and expensive in raw-input requirements. Third, increase the GPU count from 2048 to 4096 and watch per-GPU throughput halve. The loader has more slack per GPU but the aggregate bandwidth requirement still has to be met by your storage layer.

The geometry to remember. A pipeline is two narrowings stacked on top of each other. Vertically: each stage narrows by its retention rate. Horizontally: the entire funnel narrows from PB at the top to TB at the bottom over 14.8T tokens. Make the vertical narrowing too gentle and quality collapses. Make it too sharp and you cannot finish the offline pass before training starts. The art is in matching the two slopes.

Plain Python: A Composable Generator Pipeline

Before pulling in PyTorch, it is worth writing the offline pipeline as plain Python. The whole structure is generator composition: each stage is a function that consumes an iterator and yields an iterator. The same code runs on a single laptop with a 1 MB shard and — modulo distribution — on a 4-PB CommonCrawl dump.

🐍offline_pipeline.py
6Pipeline as a chain of iterators

Each stage is a Python generator. Generators are lazy — they pull one document at a time from upstream and push one document at a time downstream. The whole pipeline streams: at no point does the full corpus live in memory. That is the only way the same code runs unchanged from a 1 MB toy shard to a 6 PB CommonCrawl dump.

EXECUTION STATE
Doc = tuple[str, str]
9Ingest: the source stage

Reads raw shards from disk. In production this is replaced by an S3 / GCS / HDFS reader, but the shape is identical: yield one (doc_id, text) per document. errors='ignore' is non-negotiable for web text — CommonCrawl shards routinely contain malformed UTF-8 and a crashing reader would lose entire shards.

16Language ID: a 60% cut on Day 1

Crude proxy: the fraction of ASCII characters. A real pipeline uses fastText's lid.176 (Joulin et al. 2017) — a ~125MB classifier that labels 176 languages at ~30k docs/sec per core. Even a target like 'English + Chinese + code' typically keeps only 30–40% of raw CommonCrawl, because CommonCrawl is overwhelmingly Russian, Spanish, Indonesian, and Arabic boilerplate.

EXECUTION STATE
ascii_frac threshold = 0.9
26Heuristic filter: kill the spam

Two cheap rules catch most low-effort web text: minimum length (200 chars) and word-uniqueness (≥30%). A real pipeline adds dozens more — Gopher rules (Rae et al. 2022) check mean line length, fraction of bullet-prefixed lines, ratio of digits, fraction of ellipses. The pattern is always the same: cheap per-document predicates applied in parallel.

EXECUTION STATE
min_chars = 200
min_uniq_frac = 0.30
37Exact dedup: SHA-1 the document

Hash each doc and drop hashes we have already seen. SHA-1 is enough — collisions are astronomically unlikely at corpus scale. This single line removes 30–50% of web text, because shards from different months copy the same articles. The set 'seen' is the only stateful piece in the whole pipeline so far; at PB scale it spills to a Bloom filter or a sharded RocksDB.

EXECUTION STATE
hash_bits = 160 (SHA-1)
47Tokenize: from characters to model units

Real pipelines run a trained BPE / SentencePiece tokenizer here (Chapter 3 builds one from scratch). We use a whitespace stand-in so the demo runs without dependencies. The crucial output is an integer count: every downstream calculation — token budget, FLOP estimate, shard size — is denominated in tokens, not characters.

53Composition is the whole architecture

The 'DAG' is just function composition. tokenize_count(exact_dedup(...(ingest(...)))) reads like the math: each function consumes one stage's output as its input. Production pipelines (Dolma, Dataverse, NeMo Curator) add scheduling, retries, and distributed shuffles, but the dataflow is exactly this chain.

60Measuring retention is built-in

Counting docs_kept and tokens_kept at the end of the chain gives the empirical retention rate without any new code. Run the same loop after each stage and you get a per-stage retention curve — the actual funnel the visualizer above renders. At PB scale this counter is what tells you whether you have enough raw input to reach 14.8T tokens.

EXECUTION STATE
docs_kept = running count
tokens_kept = running count
61 lines without explanation
1from __future__ import annotations
2import hashlib
3import re
4from collections.abc import Iterator
5
6# A tiny composable pretraining pipeline. Every stage is a generator that
7# consumes (doc_id, text) tuples and yields the survivors. The whole
8# pipeline is functional composition — no framework, no DAG engine.
9
10Doc = tuple[str, str]
11
12def ingest(paths: list[str]) -> Iterator[Doc]:
13    """Stage 1 — read raw shards from disk, one document per line."""
14    for p in paths:
15        with open(p, encoding="utf-8", errors="ignore") as f:
16            for i, line in enumerate(f):
17                yield f"{p}:{i}", line.strip()
18
19def language_filter(docs: Iterator[Doc]) -> Iterator[Doc]:
20    """Stage 2 — crude language guess: keep ASCII-heavy English."""
21    for did, text in docs:
22        if not text:
23            continue
24        ascii_frac = sum(c.isascii() for c in text) / max(1, len(text))
25        if ascii_frac > 0.9:
26            yield did, text
27
28def heuristic_filter(docs: Iterator[Doc]) -> Iterator[Doc]:
29    """Stage 3 — drop too-short or highly repetitive docs (SEO spam)."""
30    for did, text in docs:
31        if len(text) < 200:
32            continue
33        words = text.split()
34        uniq = len(set(words)) / max(1, len(words))
35        if uniq < 0.30:
36            continue
37        yield did, text
38
39def exact_dedup(docs: Iterator[Doc]) -> Iterator[Doc]:
40    """Stage 4 — drop bit-identical documents seen earlier in the stream."""
41    seen: set[str] = set()
42    for did, text in docs:
43        h = hashlib.sha1(text.encode()).hexdigest()
44        if h in seen:
45            continue
46        seen.add(h)
47        yield did, text
48
49def tokenize_count(docs: Iterator[Doc]) -> Iterator[tuple[str, int]]:
50    """Stage 5 — stand-in for a real BPE tokenizer."""
51    for did, text in docs:
52        n = len(re.findall(r"\S+", text))   # whitespace token count
53        yield did, n
54
55def pipeline(paths: list[str]) -> Iterator[tuple[str, int]]:
56    return tokenize_count(
57        exact_dedup(
58            heuristic_filter(
59                language_filter(
60                    ingest(paths)))))
61
62if __name__ == "__main__":
63    paths = ["data/crawl_shard_001.txt"]
64    docs_kept, tokens_kept = 0, 0
65    for did, n in pipeline(paths):
66        docs_kept   += 1
67        tokens_kept += n
68    print(f"docs kept   = {docs_kept}")
69    print(f"tokens kept = {tokens_kept}")

Three architectural points are worth marking. First, nothing in this pipeline materializes a list. Every stage is lazy. That is what makes it scale: peak memory is a constant function of the per-document size, not the corpus size. Second, the order of stages matters economically. Language ID and exact dedup both throw away half their input but cost <1 µs per doc; a perplexity-based quality model costs ~1 ms per doc. Putting the cheap filters first amortizes the expensive ones, which is why every production pipeline from Dolma to NeMo Curator runs the cheap stages early. Third, the "DAG" is just function composition. Production frameworks add scheduling, retries, sharded shuffles, and provenance tracking, but every one of them ultimately compiles down to a chain of operators identical in shape to the chain above.

Where this code grows up. The same five-function skeleton — ingest, filter, dedup, score, tokenize — appears in every published pretraining pipeline. RedPajama-Data, Dolma, FineWeb, Nemotron Curator, and DeepSeek-V3's internal toolchain are all variations on this theme with industrial scheduling on top.

PyTorch: Sharded IterableDataset and the Train Loader

Once the offline pipeline has produced tokenized shards on disk, the online half takes over. Its only job is to stream those shards into the GPU at line rate, without ever replaying tokens and without ever stalling. PyTorch's IterableDataset + DataLoader is the canonical implementation. The complexity is not in the iterator body — it is in the sharding logic that ensures each token reaches exactly one GPU per epoch.

🐍train_loader.py
7Why IterableDataset, not Dataset

Map-style Dataset requires random access — every shard must be open and seekable, and the sampler must build an index over the entire corpus. At 14.8T tokens that index alone would be larger than the GPU's HBM. IterableDataset is the only viable shape: each worker holds a streaming cursor over a small set of shards, never the whole corpus.

13Constructor: a list of shard paths and a seed

shard_paths is a list of pre-tokenized binary files — typically 100MB–1GB each, encoded as uint32 (room for a 4-billion-entry vocabulary). seq_len is the model's context window (4096 for DeepSeek-V3). seed is per-run, not per-worker, so the global shuffle is reproducible across restarts.

EXECUTION STATE
seq_len = 4096
shard_paths.len = 8192 (~1 TB tokenized)
19Sharding across ranks AND workers

Two partitions stacked. First by DDP rank so each GPU sees a different slice of the corpus. Second by DataLoader worker id, so each rank's 4 CPU workers each see 1/4 of that slice. This is the canonical pattern: outer-mod-world picks the rank's portion, inner-mod-num_workers picks the worker's. Get it wrong and you replay tokens — the silent killer of pretraining quality.

EXECUTION STATE
rank = 0..2047
world_size = 2048
num_workers = 4
30Per-worker epoch shuffle

Each (rank, worker) reshuffles ITS OWN shard list with a deterministic seed. We never shuffle individual tokens — that would defeat any sequential structure in long documents — but we shuffle which shard each worker grabs next. Globally, the combination of rank-mod + worker-mod + per-worker shuffle gives a fully random shard order with no duplicates and no gaps.

EXECUTION STATE
seed expression = seed + rank*1009 + wid
35memmap: zero-copy mmap of the shard

np.memmap maps the file into virtual memory without loading it. The OS pages in tokens on demand, which means a 1 GB shard does NOT cost 1 GB of RAM — only the active window does. At 8192 shards × 1 GB this is the only realistic way to read the corpus on commodity hosts.

EXECUTION STATE
dtype = uint32 (4 bytes/token)
38Pack-and-yield: fixed-length sequences

We pull seq_len+1 tokens (need +1 because the label is the shifted version of the input) and emit a single (x, y) pair. Then we advance the buffer by exactly seq_len, leaving zero tokens unused. This is 'packed' training: no PAD tokens, no wasted positions. At 4096-context training this is a ~6% throughput win over padded batches.

EXECUTION STATE
x.shape = (4096,)
y.shape = (4096,)
51DataLoader: the glue between Python and GPU

num_workers=4 forks 4 CPU processes per GPU, each running its own iterator. pin_memory=True allocates the output tensors in pinned host memory so the H2D copy becomes a single DMA call. persistent_workers=True keeps the worker processes alive across iterations — without it, every epoch boundary spawns new processes and re-mmaps every shard.

EXECUTION STATE
prefetch_factor = 4 (4 batches queued per worker)
59The training loop is trivial — by design

All the complexity is upstream. By the time the model sees a batch, every token has already gone through ingest, language ID, heuristic filter, two dedup passes, quality classifier, PII removal, BPE encoding, packing, sharding, and prefetch. The loop body is the same five lines you'd write for MNIST. That separation is exactly the point of a pipeline.

52 lines without explanation
1import numpy as np
2import random
3import torch
4import torch.distributed as dist
5from torch.utils.data import IterableDataset, DataLoader
6
7class PackedShardDataset(IterableDataset):
8    """
9    Stream pre-tokenized .bin shards (uint32 arrays of token IDs) and pack
10    them into fixed-length sequences. Each (rank, worker) sees a disjoint
11    slice of shards, so the model sees every token exactly once per epoch.
12    """
13    def __init__(self, shard_paths: list[str], seq_len: int, seed: int):
14        super().__init__()
15        self.shard_paths = sorted(shard_paths)
16        self.seq_len     = seq_len
17        self.seed        = seed
18
19    def _my_shards(self) -> list[str]:
20        rank   = dist.get_rank()        if dist.is_initialized() else 0
21        world  = dist.get_world_size()  if dist.is_initialized() else 1
22        info   = torch.utils.data.get_worker_info()
23        wid    = info.id           if info is not None else 0
24        nw     = info.num_workers  if info is not None else 1
25        # Two-level partition: across DDP ranks, then across DataLoader workers.
26        mine = [
27            p for i, p in enumerate(self.shard_paths)
28            if i % world == rank and (i // world) % nw == wid
29        ]
30        random.Random(self.seed + rank * 1009 + wid).shuffle(mine)
31        return mine
32
33    def __iter__(self):
34        buf: list[int] = []
35        for shard in self._my_shards():
36            arr = np.memmap(shard, dtype=np.uint32, mode="r")
37            buf.extend(arr.tolist())
38            while len(buf) >= self.seq_len + 1:
39                window = buf[: self.seq_len + 1]
40                buf    = buf[self.seq_len :]                # advance by seq_len
41                x = torch.tensor(window[:-1], dtype=torch.long)
42                y = torch.tensor(window[1:],  dtype=torch.long)
43                yield x, y
44
45ds = PackedShardDataset(
46    shard_paths=[f"/data/shards/shard_{i:05d}.bin" for i in range(8192)],
47    seq_len=4096,
48    seed=20250517,
49)
50loader = DataLoader(
51    ds,
52    batch_size=2,             # per-GPU micro-batch
53    num_workers=4,            # CPU workers per GPU
54    pin_memory=True,
55    persistent_workers=True,
56    prefetch_factor=4,
57)
58for step, (x, y) in enumerate(loader):
59    # x.shape == y.shape == (B, T) — packed, sharded, shuffled. Train as usual.
60    pass

Two subtleties recur in every real loader and are worth surfacing:

  1. Token-level shuffling is wrong. If you naïvely shuffle across token boundaries, you destroy long-range structure within documents — the very signal a 4096-context transformer is supposed to model. The shuffle granularity is the shard, sometimes the document, never the token. The packed-sequence loader above preserves the original token order within each document and only shuffles the order in which documents are streamed.
  2. Resumability is part of the loader, not the trainer. When training crashes (and at 54 days of wall-clock, it will), you must resume from the last token boundary, not the last batch. Production loaders persist a (shard_index, byte_offset) pair per (rank, worker) into the checkpoint and reseed exactly there on restart. Get this wrong and you replay or skip millions of tokens silently, which surfaces as mysterious loss spikes weeks later.
What the next sections build on top of this. 8.2 covers deduplication, the single highest-leverage stage in the funnel — it is also the first stage where the offline cluster turns from CPU-bound to I/O-bound and the algorithmic choices (MinHash vs SimHash vs exact) start to dominate the budget. 8.3 covers quality filtering and the classifier you slot in between dedup and tokenization. 8.4 covers domain mixing, which determines how the surviving tokens are weighted on the way to the model. The funnel structure stays the same; each section replaces a slice of it with the engineering DeepSeek-V3 actually shipped.

What Changes at Massive Scale

At toy scale the pipeline is a Python script that runs in twenty minutes. At 14.8T-token scale it is its own distributed system, with operational constraints that look more like a database than a script. Four things change as the corpus grows.

The offline stage becomes a job scheduler

Past about 100 TB, no single host can store the working set. Stages become Spark / Ray / Beam jobs with explicit input and output shard manifests; intermediate results are written to object storage and consumed by the next stage. Provenance tracking — for each surviving token, which raw URL did it come from? — becomes a hard requirement for contamination detection (Section 8.7). The pipeline stops being a program and starts being a workflow.

Dedup becomes the bottleneck

Exact dedup over 6 PB needs roughly 6×10116 \times 10^{11} document hashes in a single set. That does not fit in memory on any host. The fix is sorted external merge or a sharded RocksDB / Bloom-filter approach. Near-dedup is worse: MinHash LSH over a billion documents requires a permutation-based signature for each doc and a multi-pass band-collision search. Section 8.2 spends its entire length on this stage because, at frontier scale, dedup typically costs more CPU than every other stage combined.

The online loader becomes I/O-bound, not CPU-bound

At 2048 GPUs the aggregate read is ~12 GB/s. A single NVMe SSD does ~3 GB/s; a single S3 bucket region throttles around 25 GB/s. The loader's architecture splits: hot shards live on per-host NVMe (read locally by the GPU's own workers), cold shards live in object storage and are prefetched ~10 minutes ahead of consumption. Some teams add a CDN-style cache tier in between. The DataLoader code looks unchanged; the storage layer underneath becomes a multi-tier system whose design is the difference between 95% and 99% MFU.

Failure isolation becomes a design goal

At 2048 GPUs, you will lose roughly one GPU per day to ECC errors, cable flaps, or NCCL deadlocks. The loader must not lose its place when its host dies. The fix is deterministic per-shard ordering plus a step-counter checkpoint: any restarted rank can reconstruct exactly which token it was about to read by replaying its sharding function with the same seed. The Python code in the previous section already bakes this in (seed + rank*1009 + wid); the engineering reality is that without it, every node failure costs hours of re-shuffle and risks replaying tokens.

ScalePipeline characterDominant cost
1 GB toysingle Python scriptdeveloper time
1 TB academicsingle host with multiprocessingtokenizer CPU
100 TB labSpark / Ray clusterexact dedup memory
1 PB+ frontiermulti-stage DAG + object store + provenance DBnear-dedup + classifier
6 PB DeepSeek-V3offline cluster + online tiered loader + checkpoint-aware shardingthroughput SLA
The headline. What scales from a script to a system is not the code — it is the operational guarantees the code is asked to uphold: exactly-once delivery, deterministic restart, provenance traceability, throughput SLA. The pipeline becomes whatever combination of compute, storage, and scheduling is required to uphold those guarantees over a 54-day window.

Engineering Reality: Where Pipelines Actually Break

Three failure modes account for the majority of real-world pipeline incidents at frontier scale. Each of them is invisible in the math and only becomes obvious in the engineering.

  1. Silent token replay. A bug in the rank-/worker-mod sharding sends the same shard to two workers. The model still trains, the loss still drops, but it drops on a corpus that is effectively 1.x epochs of the same data — equivalent to training on 30–50% less unique text than the spec promised. The only diagnostic is a per-shard usage counter compared against an expectation; the fix is unit tests on the sharding function and reproducibility checks across restarts.
  2. Quality drift across the run. If the offline pipeline produces shards in the order they were processed (cheap first, expensive later), then shard ii has systematically different quality than shard i+ki + k. Without a global shuffle, the model sees easy text early and hard text late, which interacts badly with the learning-rate schedule (Chapter 11). The fix is a corpus-wide shuffle done once at the end of the offline pipeline, before shards are written to their final paths.
  3. Loader stalls disguised as training stalls. When the loader cannot keep up — usually because cold shards have not been prefetched in time — GPUs idle for a few hundred milliseconds per step. NCCL allreduces still complete, the train loop still ticks, but MFU drops from 40% to 30%. The diagnosis is to add per-step loader latency telemetry; the fix is to widen the prefetch window or warm the storage cache. This is the most common reason a model team mistakes a data problem for an optimizer problem.

One sentence to carry forward into the rest of the chapter: the pipeline is a contract between the offline retention math and the online throughput math, and every subsequent design choice — dedup algorithm, quality classifier, mixing weights, curriculum, contamination check — is a renegotiation of one side of that contract. The remaining sections of Chapter 8 walk through each renegotiation in order.

Where we go from here. Section 8.2 takes apart deduplication — exact, MinHash, SimHash — and shows why it is both the most expensive stage and the one with the largest quality impact. Section 8.3 builds the quality classifier that sits between dedup and tokenization. Section 8.4 turns surviving tokens into a weighted domain mixture. By Section 8.7 we have a complete, contamination-aware pipeline that produces exactly the 14.8T-token corpus DeepSeek-V3 was trained on. The funnel never goes away; we are filling in its slices.
Loading comments...