Chapter 11
28 min read
Section 61 of 117

Data Parallelism (DP)

Distributed Training: DualPipe and the Parallelism Stack

The Real Problem: One Batch, Too Slow

Section 11.1 made the case that a single GPU cannot hold a frontier model. But before any of the more exotic parallelism schemes — tensor, pipeline, expert, sequence — there is a simpler problem that data parallelism solves, and that you will meet on the very first day you own more than one GPU.

SGD's gradient is an average. With a batch of BB examples and loss Li(θ)\mathcal{L}_i(\theta) on each example, one step of gradient descent is:

θθη1Bi=1BLi(θ).\theta \leftarrow \theta - \eta \cdot \frac{1}{B} \sum_{i=1}^{B} \nabla \mathcal{L}_i(\theta).

The sum is embarrassingly parallel — you can compute any partition of those BB terms anywhere and the result is the same as long as you add them at the end. So the moment you have a second GPU sitting next to the first, the obvious speed-up writes itself: give half the batch to each GPU, run forward and backward on the local half, then add the two gradient contributions and apply one shared update.

That idea — same model, sharded data, summed gradients — is data parallelism. It is the single most-deployed parallelism strategy in deep learning: the one you reach for first, the one every other parallelism scheme composes on top of, the one that makes 8-GPU nodes and 8 000-GPU clusters look architecturally similar from Python's point of view. And it is the reason a frontier run can process tens of millions of tokens per second instead of tens of thousands.

ScaleGoalBottleneckWhy DP is the right hammer
1 GPU → 8 GPUsFaster iteration on prototypesWall-clock per stepLinear speed-up on tiny code change
8 → 64 GPUsLarger global batch, stable LR scalingGradient AllReduce volumeStill dominant — comm < compute on NVLink + IB
64 → 1024 GPUsPre-training a 7–70 B modelBandwidth × N grows; weights barely fitDP + ZeRO-1/2 stays competitive
1024 → 8192 GPUsPre-training a 200+ B MoEAllReduce ≫ compute, weights don't fitDP alone breaks — needs FSDP/ZeRO-3 + TP/PP/EP
The deceptively simple invariant. Every rank must hold the same model parameters at every step. Break that invariant — even with a single mistimed all-reduce — and the four ranks start drifting, each computing the right gradient for a slightly different model. Training quality collapses within a few hundred steps and is almost impossible to debug from the loss curve alone. The whole engineering ritual of DP is about defending this invariant in the presence of network jitter, partial failures, and non-deterministic kernels.

Intuition: Many Workers, One Identical Model

Picture a workshop. You are training a model — call it θ\theta — and the training set is a giant bin of examples. With one worker, training is sequential: pick a handful of examples, compute the average gradient, nudge θ\theta, repeat.

Hire eight workers. Give each of them the same copy of the current model and a different handful of examples. Each worker computes their local mean gradient. They are now staring at eight different vectors pointing in eight slightly different directions — same model, different data, different gradients.

Here is the only subtle part of DP: the eight workers cannot just each apply their own gradient and walk away. If they did, they would end the step with eight different models and the workshop would split into eight uncoordinated trainings. So they pause. They announce their gradient vectors to each other. They sum them and divide by eight. Now every worker holds the same averaged direction. Every worker takes the same step. The eight copies of the model are once again identical, ready for the next iteration.

Three intuitive consequences flow from this picture:

  1. DP is mathematically equivalent to single-GPU SGD on a bigger batch. Eight workers each at batch 8 is — at the gradient level — identical to one worker at batch 64. The reason this matters: you can predict what DP will do, in terms of loss curve and learning-rate sensitivity, by treating it as a batch scale-up.
  2. The sync step is the bill. Forward and backward are perfectly parallel — N workers, N× the throughput, no coordination needed. The only thing that costs you is the gradient-averaging round, and that cost grows with the number of parameters in the model, not the number of examples in the batch. That is why a 70 B model on 1024 GPUs is communication-bound while a 70 M model on 1024 GPUs is compute-bound.
  3. You cannot use DP to fit a bigger model. Every worker holds the entire model. Eight workers means eight identical copies of θ\theta. DP buys you more throughput, not more capacity. The moment your model stops fitting on one GPU, you need ZeRO/FSDP, tensor parallelism, or pipeline parallelism — DP alone will not save you. Sections 11.3–11.7 are the answer to that.
Synchronous vs. asynchronous DP. The picture above is synchronous DP — everyone waits for the average. There is a classic alternative, async DP (Hogwild!, parameter server), where workers shove gradients into a central store without waiting. It was popular in the GPT-2 era for tolerating stragglers but has been almost entirely abandoned at frontier scale: the noise from stale gradients hurts convergence, modern collectives are fast enough that the straggler problem is small, and reproducibility (essential for debugging a 50-day run) is much worse. Every frontier run since 2020 — DeepSeek-V3, LLaMA-3, GPT-4 — is synchronous DP.

The Math: Mini-Batch SGD Across N Workers

Let the global mini-batch be B={(xi,yi)}i=1B\mathcal{B} = \{(x_i, y_i)\}_{i=1}^{B} and partition it into NN disjoint shards Br\mathcal{B}_r of size B/NB/N each, one per rank. The single-GPU gradient at parameter θ\theta is:

g(θ)=1Bi=1BθL(xi,yi;θ).g(\theta) = \frac{1}{B} \sum_{i=1}^{B} \nabla_\theta \mathcal{L}(x_i, y_i; \theta).

Rewrite the sum as a sum-over-ranks:

g(θ)=1Br=1N(xi,yi)BrθL(xi,yi;θ)=1Nr=1Ngr(θ),g(\theta) = \frac{1}{B} \sum_{r=1}^{N} \sum_{(x_i, y_i) \in \mathcal{B}_r} \nabla_\theta \mathcal{L}(x_i, y_i; \theta) = \frac{1}{N} \sum_{r=1}^{N} g_r(\theta),

where gr(θ)=NB(xi,yi)BrθL(xi,yi;θ)g_r(\theta) = \frac{N}{B} \sum_{(x_i, y_i) \in \mathcal{B}_r} \nabla_\theta \mathcal{L}(x_i, y_i; \theta) is the local mean gradient computed on rank rr. This identity is the entire mathematical content of data parallelism. The global gradient is the average of the local gradients provided each rank divides by its own shard size (so that grg_r is already a mean, not a sum).

The SGD update on every rank is then:

θ(t+1)=θ(t)η1Nr=1Ngr(θ(t)).\theta^{(t+1)} = \theta^{(t)} - \eta \cdot \frac{1}{N} \sum_{r=1}^{N} g_r(\theta^{(t)}).

Three engineering consequences fall out of these two lines:

  1. Synchrony is required. Every grg_r is evaluated at the same θ(t)\theta^{(t)}. If rank 3 is one step behind, its g3g_3 is L(;θ(t1))\nabla \mathcal{L}(\cdot; \theta^{(t-1)}) — a gradient for the wrong model. Async DP accepts this bias for throughput; sync DP refuses.
  2. The communication primitive is sum, not max or min. We need rgr\sum_r g_r. Then we divide locally by NN. NCCL and other collective libraries expose this as AllReduce(SUM) — every rank ends up holding the same sum, no central server, no leader election.
  3. The learning rate must scale. Replacing batch B/NB/N with batch BB reduces gradient variance by N\sqrt{N}. The empirical rule (Goyal et al. 2017) is linear LR scaling: multiply η\eta by NN, then warm it up linearly for the first ~5 % of steps to keep early training stable. Frontier runs (LLaMA, DeepSeek-V3) tune this empirically — the linear rule is the prior, not the answer.
The bias trap. If your shards have different sizes (last rank gets fewer examples because of BmodN0B \bmod N \neq 0), the simple (1/N)gr(1/N) \sum g_r formula gives the wrong answer. Either use drop_last=True on the DistributedSampler so every shard is identical, or weight each gradient by its shard size before averaging. The former is what every production codebase does — uniform shards keep all-reduce sizes uniform too, which keeps NCCL happy.

AllReduce: The Sync Primitive That Makes DP Work

The math says we need rgr\sum_r g_r, identical on every rank, in as few bytes per rank as possible. There are three natural ways to implement it; only one of them scales.

Naive parameter server: O(N)O(N) bytes on the server

Pick one rank as the server. Every other rank sends its gradient to the server, the server adds them up, then broadcasts the sum back. Total bytes through the server: about 2(N1)g2 (N-1) |g|. The server is a hot spot — every byte of every gradient flows through it, both in and out. Network and CPU at the server saturate the moment NN is more than a handful. This is what DistBelief used in 2012; nobody uses it for synchronous frontier training today.

Tree all-reduce: O(logN)O(\log N) latency

Arrange the ranks in a binary tree. Sum up the tree, broadcast down. Total bytes per rank: 2glog2N2 |g| \log_2 N for small messages, but each link is sequential — the cost is dominated by latency, not bandwidth. Good for small payloads (latencies of microseconds), bad for whole-model gradient buffers.

Ring all-reduce: 2(N1)/Ng2(N-1)/N \cdot |g| bytes per rank

This is the algorithm every modern DP framework uses, because it has a remarkable property: the bytes each rank must send and receive approach a constant (about 2g2 |g|) as NN grows. Doubling the cluster does not double the traffic per GPU; it barely moves it. The algorithm has two phases, each running for N1N - 1 rounds:

  1. Scatter-reduce. Split the gradient buffer on every rank into NN equal chunks. In round ss, rank rr sends chunk index (rs)modN(r - s) \bmod N to rank r+1r+1, which adds it onto its own copy of that chunk. After N1N - 1 rounds, rank rr owns the full sum of exactly one chunk. Bytes per rank: (N1)/Ng(N-1)/N \cdot |g|.
  2. All-gather. Same ring, no addition — each fully-summed chunk walks N1N - 1 more hops, overwriting each rank's stale copy. After another N1N - 1 rounds, every rank holds every fully-summed chunk. Bytes per rank: another (N1)/Ng(N-1)/N \cdot |g|.

Total bytes per rank: Bring=2(N1)NgN2g.B_{\text{ring}} = \frac{2(N-1)}{N} \cdot |g| \xrightarrow{N \to \infty} 2 |g|. Compare with the naive PS at 2(N1)g2(N-1) |g| on a single link. For N=1024N = 1024 ranks and a 70 B BF16 model (g140GB|g| \approx 140 \, \text{GB}), the ring sends about 280GB280 \, \text{GB} per rank; the parameter server's central node would have to absorb 1401023143TB140 \cdot 1023 \approx 143 \, \text{TB}. At 400 GB/s of network bandwidth (typical IB on a frontier cluster), that single link would need an hour per step.

Why ring all-reduce is bandwidth-optimal. Every gradient byte must enter and leave every rank exactly once for the result to be correct (each rank must contribute, each rank must receive the sum). The ring achieves this lower bound up to a tiny 1/N1/N overhead. No general AllReduce algorithm can do less wire traffic per rank. NCCL on a single 8-GPU node uses NVLink rings; across nodes it uses a 2-D ring (one ring within each node, one ring across nodes). The hardware shape is different; the cost formula is the same.

Manual Numerical Walkthrough

Two by-hand calculations: one round of ring all-reduce on 4 GPUs with toy 4-element gradient vectors, and a wall-clock estimate of DP scaling on a 70 B model across 256 H100s.

Click to expand: Ring all-reduce on 4 GPUs, by hand

Setup. 4 GPUs, each with a 4-element gradient buffer. We give the buffers easy-to-track integer values: GPU rr chunk cc starts as (r+1)10+c(r+1) \cdot 10 + c.

GPUabcd
GPU 010111213
GPU 120212223
GPU 230313233
GPU 340414243

Targets. The per-chunk sums every rank should end up with: a=100a = 100, b=104b = 104, c=108c = 108, d=112d = 112.

Phase 1, round 1 (scatter-reduce). On round 1 rank rr sends chunk rr to rank r+1r+1, which adds it onto its own copy of chunk rr. So rank 0 sends chunk a to rank 1; rank 1 sends chunk b to rank 2; rank 2 sends chunk c to rank 3; rank 3 sends chunk d to rank 0. After round 1:

GPUabcd
GPU 010111213 + 43 = 56
GPU 120 + 10 = 30212223
GPU 23031 + 21 = 523233
GPU 3404142 + 32 = 7443

Phase 1, rounds 2–3. Same pattern, shifted one chunk back each round. The interactive viewer above unrolls all three rounds step by step (the bookkeeping is fiddly enough by hand that it is worth watching it animate). After round 3, each GPU owns the full sum of exactly one chunk:

GPUFully-summed chunkValue
GPU 0d10+20+30+40+... wait, no — d sums to 13+23+33+43 = 112
GPU 1a10+20+30+40 = 100
GPU 2b11+21+31+41 = 104
GPU 3c12+22+32+42 = 108

Phase 2, all-gather (rounds 4–6). Each fully-summed chunk now walks the ring without further adding. After 3 more rounds every GPU holds the same vector: [100,104,108,112][100, 104, 108, 112]. Total rounds: 2(N1)=62(N-1) = 6. Total bytes per GPU: 23/4g=1.5g2 \cdot 3 / 4 \cdot |g| = 1.5 \, |g|. For a vector of 4 floats that is 24 bytes per GPU — but the same formula applies to a 140 GB gradient: 210 GB sent per rank, independent of NN beyond a few.

Click to expand: Sizing DP for a 70 B model on 256 H100s

Model. Nparams=71010N_{\text{params}} = 7 \cdot 10^{10}, BF16 gradients, so g=2Nparams=140GB|g| = 2 N_{\text{params}} = 140 \, \text{GB}.

Cluster. 256 H100s, 32 nodes × 8 GPUs. Per-GPU peak: 989 BF16 TFLOPS. Inter-node: 400 GB/s NDR Infiniband; intra-node: 900 GB/s NVLink. NCCL uses a hierarchical ring that effectively operates at the inter-node 400 GB/s bound for whole-cluster collectives.

AllReduce time per step. Bytes per GPU: 2255/256140279GB2 \cdot 255/256 \cdot 140 \approx 279 \, \text{GB}. Time: 279/4000.70s279 / 400 \approx 0.70 \, \text{s}.

Compute time per step. Take micro-batch 1 × sequence 4096 per GPU. FLOPs per step across the cluster: 67101025640964.410176 \cdot 7 \cdot 10^{10} \cdot 256 \cdot 4096 \approx 4.4 \cdot 10^{17}. Aggregate cluster compute at 50 % MFU: 25698910120.51.271017256 \cdot 989 \cdot 10^{12} \cdot 0.5 \approx 1.27 \cdot 10^{17} FLOP/s. Time: 4.4/1.273.5s4.4 / 1.27 \approx 3.5 \, \text{s}.

Overlap headroom. Compute > all-reduce, so DDP can hide the entire all-reduce inside the backward pass with bucket-level overlap. Effective step time stays near 3.5 s, throughput is 25614096/3.5300k tokens/s256 \cdot 1 \cdot 4096 / 3.5 \approx 300 \, \text{k tokens/s}.

What happens at 2048 GPUs. Compute per step drops to 0.44s\approx 0.44 \, \text{s}, but all-reduce per GPU is essentially unchanged at 0.70s\approx 0.70 \, \text{s}. Communication now dominates — the cluster spends 60 % of every step waiting for the network. This is the wall pure DP hits, and why every > 1024-GPU frontier run uses FSDP or sharded-optimiser DP (ZeRO-1/2) layered on top.

Visualizing the Ring and the Scaling Wall

Two widgets. The first animates ring all-reduce step by step — watch the cyan chunks accumulate into the green fully-reduced chunks, then propagate around the ring in the all-gather phase. The second is a scaling calculator: pick a model, GPU count, network bandwidth, and precision, and see when DP's communication cost overtakes compute.

Loading ring all-reduce visualizer…
What to look for. Click Step through the first phase: each round, exactly one chunk on each GPU becomes a little more accurate (cyan highlight). By the end of scatter-reduce, each GPU owns the full sum of one chunk and partial sums of the rest. The all-gather phase is pure copying — no further math, just propagation. The 2(N1)2(N-1) total rounds and the bandwidth-optimal property fall out of this picture directly.
Loading DP scaling calculator…
Try this. Load the "LLaMA-70B / 256 H100" preset and watch the throughput. Now push GPUs to 1024 while holding everything else constant. Compute time shrinks linearly; all-reduce time barely moves. Around 800–1024 GPUs the all-reduce card turns red — that is the moment pure DP becomes bandwidth-bound and frameworks switch to FSDP or hybrid parallelism. Switch precision from BF16 to FP8 to halve all-reduce volume and push that wall out by a factor of two.

Plain Python: DP and Ring-AllReduce From Scratch

Before reaching for PyTorch DDP, let us build the whole thing in plain NumPy — including the ring all-reduce, by hand, no MPI. The toy model is a single linear layer; the toy "cluster" is a list of arrays in one process. The mechanics are byte-identical to what runs across 2048 H800s.

data_parallel_from_scratch.py
🐍python
7Why we set the seed and the per-GPU batch size

Data parallelism only works if every rank starts from the same random initialisation. In a real run that is enforced by broadcasting rank 0's weights at start-up; here we simulate it with a fixed numpy seed. BATCH_PER_GPU is the micro-batch the GPU actually sees in forward+backward — multiply it by N_GPUS to get the global batch size that gradient descent statistically operates on.

EXECUTION STATE
N_GPUS = 4 simulated workers
BATCH_PER_GPU = 8 examples / rank
LR = 0.1
9Each rank reads a disjoint shard of the dataset

make_shard returns 8 (x, y) pairs that depend on gpu_id, mimicking how a DistributedSampler hands each rank its own non-overlapping slice. The +gpu_id shift makes the four shards genuinely different distributions so the local gradients disagree — that disagreement is exactly what AllReduce has to reconcile.

EXECUTION STATE
x = shape (8,) — features
y = shape (8,) — targets
18The DP invariant: identical weights on every rank

Before the first forward, every rank holds the same w and b. This is non-negotiable. If two ranks diverge by even a tiny amount, the averaged gradient becomes the wrong direction and training quality collapses. PyTorch's DistributedDataParallel asserts this with a checksum-style broadcast at construction time.

EXECUTION STATE
w = 0.0 (identical on all ranks)
b = 0.0 (identical on all ranks)
22Local forward + backward — no communication yet

Each rank runs forward and backward against its own shard. The gradient (g_w, g_b) is the mean gradient over the local 8 examples. At this point the four gradients are not equal — every rank has a different answer. The whole point of the next phase is to collapse those four answers into one shared answer that every rank then uses for its update.

EXECUTION STATE
y_hat = shape (8,) — predictions
err = shape (8,) — (y_hat − y)
g_w = scalar — local weight grad
g_b = scalar — local bias grad
32Ring all-reduce — the two-phase algorithm

Every buffer here is the local gradient as a 1-D array. The ring topology says: rank r sends to rank (r+1) mod N and receives from rank (r−1) mod N. Phase 1 (scatter-reduce) walks N−1 rounds; at the end, rank r owns the fully-summed copy of exactly one chunk of the gradient. Phase 2 (all-gather) walks another N−1 rounds; at the end every rank owns every fully-summed chunk. The math is identical for 2 parameters or 70 billion — only the chunk size changes.

EXECUTION STATE
N = 4 ring participants
chunks = list of (N × chunk_size) views
38Phase 1: scatter-reduce inner loop

On round s, rank r sends chunk index (r − s) mod N to rank r+1, and the receiver adds it onto its existing chunk at index (r − s − 1) mod N. The pattern is: every chunk index gets visited by exactly one rank per round, and after N−1 rounds the chunk has accumulated all N partial values. Bytes per rank per round: |grad|/N. Total bytes per rank in Phase 1: (N−1)/N · |grad|.

EXECUTION STATE
send_idx = chunk this rank ships
recv_idx = chunk this rank receives
dst = right neighbour on the ring
47Phase 2: all-gather propagates the reduced chunks

Now no addition — just copy. The fully-summed chunk that lives on rank r walks the ring, overwriting each rank's stale copy. After N−1 rounds, every rank holds an identical full gradient vector. Bytes per rank in Phase 2: (N−1)/N · |grad|. Combined cost: 2(N−1)/N · |grad| ≈ 2|grad| for large N. This is the famous bandwidth-optimal property: ring all-reduce is asymptotically independent of N.

56Average → apply update

After all-reduce, every rank holds the SUM of the four local gradients. Divide by N_GPUS to get the mean (this is equivalent to having computed the gradient over the full 32-example global batch). The SGD update w ← w − lr · mean_grad is then applied identically on every rank, preserving the DP invariant for the next step.

EXECUTION STATE
synced[0] = (sum_g_w, sum_g_b)
mean_grad = synced[0] / 4
62Sanity check — DP must match the big-batch gradient

We concatenate all four shards into one 32-example mega-batch and compute the gradient directly. The two numbers should match the DP-averaged gradient to floating-point precision. This identity — that DP with N ranks and per-rank batch B is mathematically equivalent to single-GPU SGD with batch N·B — is the entire reason data parallelism exists.

60 lines without explanation
1import numpy as np
2
3# Tiny toy model:  y = w * x + b   with a squared-error loss.
4# We will simulate N=4 "GPUs" with disjoint mini-batches and run
5# one step of synchronous data-parallel SGD by hand.
6
7np.random.seed(0)
8N_GPUS, BATCH_PER_GPU, LR = 4, 8, 0.1
9
10def make_shard(gpu_id):
11    """Each rank gets a different slice of the global batch."""
12    x = np.random.randn(BATCH_PER_GPU).astype(np.float32) + gpu_id
13    y = 3.0 * x + 0.5 + 0.1 * np.random.randn(BATCH_PER_GPU)
14    return x, y
15
16# (A) Every rank starts with the SAME weights -- this is the DP invariant.
17w, b = np.float32(0.0), np.float32(0.0)
18
19# (B) Forward + backward on the LOCAL shard only. No comm yet.
20local_grads = []
21for g in range(N_GPUS):
22    x, y = make_shard(g)
23    y_hat = w * x + b
24    err   = y_hat - y                              # shape (B,)
25    g_w = (2.0 / BATCH_PER_GPU) * (err * x).sum()  # local mean gradient
26    g_b = (2.0 / BATCH_PER_GPU) * err.sum()
27    local_grads.append(np.array([g_w, g_b], dtype=np.float32))
28
29# (C) Ring all-reduce, implemented by hand in two phases.
30def ring_all_reduce(buffers):
31    """In-place ring all-reduce: every buffer ends up holding the sum."""
32    N = len(buffers)
33    bufs = [b.copy() for b in buffers]
34    chunk_size = bufs[0].size // N
35    chunks = [b.reshape(N, chunk_size) for b in bufs]
36
37    # Phase 1: scatter-reduce. After N-1 rounds, rank r owns the full
38    # sum of chunk (r - (N-1)) mod N.
39    for s in range(N - 1):
40        for r in range(N):
41            send_idx = (r - s)     % N
42            recv_idx = (r - s - 1) % N
43            dst = (r + 1) % N
44            chunks[dst][recv_idx] += chunks[r][send_idx]
45
46    # Phase 2: all-gather. Each fully-reduced chunk walks the ring.
47    for s in range(N - 1):
48        for r in range(N):
49            send_idx = (r - s + 1) % N
50            dst = (r + 1) % N
51            chunks[dst][send_idx] = chunks[r][send_idx]
52
53    return [c.reshape(buffers[0].shape) for c in chunks]
54
55# (D) Sync the gradients (sum), then average and apply the update.
56synced = ring_all_reduce(local_grads)
57mean_grad = synced[0] / N_GPUS                     # all ranks identical
58w -= LR * mean_grad[0]
59b -= LR * mean_grad[1]
60
61# (E) Sanity check vs the "ground truth" big-batch gradient.
62xs, ys = zip(*[make_shard(g) for g in range(N_GPUS)])
63xs, ys = np.concatenate(xs), np.concatenate(ys)
64yh = 0.0 * xs + 0.0
65big_grad_w = (2.0 / xs.size) * ((yh - ys) * xs).sum()
66big_grad_b = (2.0 / xs.size) * (yh - ys).sum()
67print(f"DP averaged grad : ({mean_grad[0]:+.4f}, {mean_grad[1]:+.4f})")
68print(f"Big-batch grad   : ({big_grad_w:+.4f}, {big_grad_b:+.4f})")
69print(f"Updated weights  : w = {w:+.4f}   b = {b:+.4f}")

The output of that script ends with two lines: DP averaged grad ≈ big-batch grad — the mathematical identity at the heart of DP, verified to floating point. If those two numbers ever disagree, DP is broken; finding why they disagree is one of the most common distributed-training debugging exercises (mis-sized shards, missing set_epoch, non-deterministic kernels, dropout-RNG drift).

PyTorch: Real DDP on a GPU Cluster

The plain-Python version was the mechanism. The PyTorch version is the same mechanism, except: the ring all-reduce lives in NCCL kernel code, the bucketing is automatic, the overlap with backward is hooked into autograd. Two extra lines around your existing training loop and you go from one GPU to one thousand.

ddp_training_loop.py
🐍python
9init_process_group — the moment N processes become a cluster

Every rank calls this once. NCCL (NVIDIA Collective Communications Library) is the backend that turns N CUDA processes into a coordinated group: it discovers the topology (NVLink within a node, IB / RoCE between nodes), opens persistent rings, and exposes blocking + non-blocking collectives. After this call, every rank knows its rank id (0..G−1) and the world_size (G). For DeepSeek-V3-style runs the same call would set up 2048 ranks across 256 nodes.

EXECUTION STATE
backend = "nccl" — required on NVIDIA GPUs
LOCAL_RANK = 0..7 within a node
14Forward — looks like a single-GPU forward, intentionally

DDP's wrapper is mostly a no-op on the forward pass. The model executes on each rank exactly as if it were single-GPU, against the rank's local shard of the global batch. No gradient communication has happened yet. This is what makes DDP a drop-in: any single-GPU training loop becomes a multi-GPU one with two lines of setup.

EXECUTION STATE
x = local mini-batch, e.g. (4, 4096) tokens
logits = (4, 4096, vocab) — local output
17loss.backward() — where the AllReduce actually runs

This is the magic line. As autograd computes each gradient, DDP's pre-registered hooks bundle nearby parameter gradients into ≈25 MB buckets, and as soon as a bucket is full it fires an asynchronous all-reduce on it. Communication overlaps with the rest of the backward pass — by the time the backward returns, the all-reduce is already mostly done. The synchronous-looking call hides several seconds of network traffic per step on a 70B model.

18optimizer.step() — identical update on every rank

Because every rank now holds the IDENTICAL averaged gradient, Adam's deterministic update produces identical new weights on every rank. The DP invariant (same weights everywhere) is preserved without any extra communication. Adam's m and v moments are also kept in lock-step because they depend only on the gradient + previous moments, both of which are bit-identical across ranks.

23DDP construction — broadcast + register hooks

Two things happen here that you do not see. (1) rank 0's parameters are broadcast to every other rank, so the run starts identically replicated. (2) DDP scans the parameter list and assigns each tensor to a bucket; backward-pass hooks are registered on every gradient leaf to fire bucket-level all-reduces as gradients are produced. gradient_as_bucket_view=True makes the gradients zero-copy views into the bucket — saving 1× model size in temporary memory.

EXECUTION STATE
device_ids = [local_rank] — pins to one GPU
static_graph = True — same ops every step
30DistributedSampler — disjoint shards per rank

The sampler splits the dataset indices into G non-overlapping subsets so that across all ranks every example is seen exactly once per epoch. set_epoch(epoch) reshuffles deterministically — if you forget that line, every epoch sees the same order, a notorious silent bug. drop_last=True keeps the per-rank shard size identical, which keeps all-reduce sizes uniform (mis-sized buffers cause NCCL hangs in production).

32DataLoader — pin_memory + persistent_workers for throughput

pin_memory=True allocates CPU memory in a page-locked region so the GPU DMA engine can pull batches without a copy. persistent_workers=True keeps the loader's worker processes alive across epochs — startup of 8 worker procs takes seconds, you do not want it inside the inner loop. num_workers=2 is a tradeoff: enough to overlap CPU pre-processing with GPU compute, not so many that the host falls over.

42torchrun — the launch story

torchrun spawns nproc_per_node processes on each of nnodes machines and gives each one the env vars (RANK, LOCAL_RANK, WORLD_SIZE, MASTER_ADDR, MASTER_PORT) that init_process_group needs. For an 8-GPU box this is 8 processes. For a 2048-GPU cluster (256 nodes × 8 GPUs) it is 2048 processes, with torchrun on each node spawning 8 of them. The Python code above is the SAME for both — only the launch command grows.

41 lines without explanation
1import os
2import torch
3import torch.distributed as dist
4import torch.nn as nn
5from torch.nn.parallel import DistributedDataParallel as DDP
6from torch.utils.data import DataLoader, DistributedSampler
7
8def setup():
9    # torchrun sets these env vars for every process.
10    dist.init_process_group(backend="nccl")          # H100/H800: nccl
11    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
12
13def train_one_step(model, batch, optimizer, criterion):
14    x, y = batch[0].cuda(non_blocking=True), batch[1].cuda(non_blocking=True)
15    optimizer.zero_grad(set_to_none=True)
16    logits = model(x)                                  # forward through DDP
17    loss   = criterion(logits, y)                       # local loss
18    loss.backward()                                     # AllReduce runs HERE,
19                                                        # bucketed & overlapped
20                                                        # with the backward pass.
21    optimizer.step()                                    # identical update on every rank
22
23def main():
24    setup()
25    rank = dist.get_rank()
26    world = dist.get_world_size()
27
28    model = MyTransformer().cuda()                      # SAME init via broadcast
29    model = DDP(model, device_ids=[int(os.environ["LOCAL_RANK"])],
30                gradient_as_bucket_view=True,           # zero-copy bucket views
31                static_graph=True)                      # enables bucket reuse
32    optim = torch.optim.AdamW(model.parameters(), lr=3e-4, fused=True)
33    crit  = nn.CrossEntropyLoss()
34
35    sampler = DistributedSampler(dataset, num_replicas=world, rank=rank,
36                                  shuffle=True, drop_last=True)
37    loader  = DataLoader(dataset, batch_size=4, sampler=sampler,
38                          num_workers=2, pin_memory=True, persistent_workers=True)
39
40    for epoch in range(num_epochs):
41        sampler.set_epoch(epoch)                        # reshuffle per epoch
42        for step, batch in enumerate(loader):
43            train_one_step(model, batch, optim, crit)
44            if rank == 0 and step % 100 == 0:
45                print(f"step {step}  loss {loss.item():.4f}")
46
47if __name__ == "__main__":
48    main()
49# Launch:  torchrun --nproc_per_node=8 --nnodes=4 train.py

Three numbers worth memorising for any DDP run:

KnobDefaultWhy it matters at scale
bucket_cap_mb25 MBSmaller → more buckets → finer overlap, more latency overhead. Tune up for huge models.
find_unused_parametersFalseTrue adds a graph traversal per step. Leave False for transformer pre-training.
static_graphFalseTrue lets DDP reuse bucket assignment across steps — strict win when graph is fixed.
gradient_as_bucket_viewFalseTrue makes .grad a view into the bucket — saves 1× model size, no downside for normal training.
The mental model that pays back forever. DDP's forward pass is "the model on this rank, run on this shard". DDP's backward pass is "autograd, plus a side-channel that all-reduces gradients into uniformity as they are produced". DDP's optimiser step is "the same update on every rank, nothing fancy". Anything that breaks one of those three lines — a non-deterministic dropout seed, a layer that runs on only some ranks, a parameter that does not receive gradients — is the source of essentially every DDP bug you will ever debug.

At Massive Scale: Where Pure DP Falls Off

DP's simplicity makes it the universal first choice; the same simplicity dictates its ceiling. Three forces push frontier runs past pure DP and into the hybrid parallelism stack that the rest of Chapter 11 builds.

  1. Memory: weights, gradients, master, Adam. For a 70 B model in BF16, every rank holds 140GB140 \, \text{GB} of weights, the same again in gradients, 280GB280 \, \text{GB} in FP32 master weights, and 560GB560 \, \text{GB} in Adam moments — a total of 1.12TB\approx 1.12 \, \text{TB} per rank, far more than any single GPU's 80–192 GB of HBM. Pure DP cannot run this model at all. ZeRO-1 shards the optimiser state, ZeRO-2 also shards the gradient, ZeRO-3 (and PyTorch's FSDP) shards the parameters themselves — at the cost of extra all-gather operations every forward and backward. Section 11.4 builds this in full.
  2. Communication: 2g2|g| per step, every step. Even when memory fits, the all-reduce volume is fixed by model size. Doubling GPUs doubles compute throughput, but per-GPU all-reduce time barely moves — so the gap between compute and comm narrows until comm dominates. At that point you must either compress the gradients (PowerSGD, 1-bit Adam, FP8 collectives) or stop having every rank hold every parameter. Tensor parallelism (Section 11.5) attacks the latter — each layer's matrix is split across a few ranks, turning a big AllReduce into smaller in-shard reductions.
  3. Pipeline bubbles for very deep models. Once a model is too big for one node, you can also slice it by layer (pipeline parallelism, Section 11.6). DP composes with that: across a 256-stage pipeline, each pipeline replica is its own data-parallel group running synchronous DP. Frontier runs are almost always "DP × TP × PP × EP" — a Cartesian product of parallelism dimensions with DP as the outermost loop.

What DeepSeek-V3 actually does

DeepSeek-V3 uses DP at the outermost level across roughly 64-rank data-parallel groups, with each group running a complex inner configuration of expert parallelism, sequence parallelism, and pipeline parallelism that the rest of the chapter unpacks. The DP all-reduce on the optimiser-state side is sharded ZeRO-style so that no rank holds the whole optimiser. The point is not that DP is obsolete at frontier scale — it is that it is one axis of a 4-D parallelism layout, and its specific contribution is what we characterised in this section: synchronous, replicated-model, ring-all-reduce gradient averaging.

Engineering Reality: Bucketing, Overlap, and the Tricks That Matter

Four engineering wins separate a textbook DP implementation from what production frameworks actually do. Each is worth roughly a 2× throughput improvement on a real cluster; combined they are the difference between "multi-GPU works" and "multi-GPU is the platform."

  1. Gradient bucketing. Issuing one all-reduce per parameter is a disaster: a 7 B model has hundreds of thousands of parameters, each all-reduce has microseconds of kernel-launch overhead, and the total per-step overhead dwarfs the actual transfer. DDP bundles consecutive parameters into ~25 MB buckets and issues one all-reduce per bucket. The bucket size is tuned so the per-bucket overhead is small but the bucket itself is large enough to be bandwidth-bound. 25 MB on a 400 GB/s link is 60 µs of actual transfer; the launch overhead is roughly 10 µs — a healthy 6:1 ratio.
  2. Backward-pass overlap. Buckets fire all-reduce as soon as they fill, while the backward pass is still computing earlier layers' gradients. On a transformer, this means the all-reduce on layer 80's gradients is in flight while CUDA is still computing the gradient for layer 60. By the time backward returns, almost all communication is done — DDP's step time is max(tcompute,tcomm)\max(t_{\text{compute}}, t_{\text{comm}}), not their sum. This is the single biggest reason DP scales further than its naive analysis suggests.
  3. Mixed-precision collectives. Sending gradients in BF16 instead of FP32 halves the wire volume; sending them in FP8 (after appropriate scaling, see Section 10.3) halves it again. Modern NCCL exposes typed all-reduce, and frameworks (DeepSpeed, Megatron-LM, the DeepSeek stack) cast on the wire and accumulate in higher precision on receive. For a 70 B model, BF16 → FP8 collectives save 70 GB per step per rank. On 1024 ranks that is 70 TB of network traffic per step removed.
  4. Sharded optimiser state (ZeRO-1). Even before touching the full ZeRO-3 / FSDP redesign, ZeRO-1 splits Adam's m,vm, v moments across DP ranks: rank rr owns the master copy of parameter shard rr. The forward pass still uses replicated BF16 weights, but optimiser memory drops by NDPN_{\text{DP}}. For a 70 B model on 64 DP ranks, that frees 550GB\approx 550 \, \text{GB} of HBM per rank — enough to enable longer sequences or larger batches. Section 11.4 explains the full sharding hierarchy.

The failure modes you will actually see

SymptomLikely causeFix
Loss diverges from single-GPU baselineForgot to scale LR by N_DPLinear LR scaling + warmup
NCCL hang on step 2 with no errorMis-sized gradient buffer on one rankdrop_last=True + uniform shard size
Loss curve identical to single-GPU but 2× slower than expectedBackward not overlapping with commEnable static_graph, set gradient_as_bucket_view
Step time fluctuates ±50% step to stepStragglers / contended IB linksTopology-aware pinning; investigate one slow node
Different log losses on rank 0 vs rank 5Dropout seeded the same on every rankSeed = base_seed + rank for dropout RNG only
MoE training: severe load imbalance + comm spikesExpert all-to-all collides with DP all-reduceStagger collectives or use separate process groups
The deepest lesson of this section. Data parallelism is the parallelism strategy with the strongest mathematical guarantees — it is provably equivalent to single-GPU SGD on a bigger batch, end of story. Every other parallelism technique we will meet — tensor, pipeline, expert, sequence — has a numerical caveat: a slightly different forward pass, a slightly different gradient, a slightly different floating-point accumulation order. DP has none. The price you pay for that purity is the per-step gradient all-reduce — bandwidth proportional to model size, paid every single step of training. The rest of Chapter 11 is the engineering of how to pay that price as cheaply as possible (FSDP, ZeRO) or how to avoid paying it for the entire model at once (TP, PP, EP). But DP is the baseline every other technique is measured against, and the one every other technique composes on top of.
Loading comments...