单机多卡( Data Parallel,DP)
简介¶
DataParallel 可以帮助我们(使用单进程控)将模型和数据加载到多个 GPU 中,控制数据在 GPU 之间的流动,协同不同 GPU 上的模型进行并行训练(细粒度的方法有 scatter,gather 等等)。
DP 基于单机多卡,所有设备都负责计算和训练网络,除此之外, device[0] (并非 GPU 真实标号而是输入参数 device_ids 首位) 还要负责整合梯度,更新参数。图 1 即为 GPU 0 作为 device[0] 的例子。从图中我们可以看出,有三个主要过程:
- 过程一(图中红色部分):各卡分别计算损失和梯度
- 过程二(图中蓝色部分):所有梯度整合到 device[0]
- 过程三(图中绿色部分):device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新
所有卡都并行运算(图中红色),将梯度收集到 device[0](图中浅蓝色)和 device[0] 分享模型参数给其他 GPU(图中绿色)三个主要过程。

虽然 DP 只能实现单机训练不能算是严格意义上的分布式训练(多个节点),但是其原理和分布式训练算法里的 Parameter Server 架构很相近,我们借用 PS 的伪代码来说明一下。

我们可以看到 PS 的并行梯度下降流程分为四部分:
Task Scheduler:负责加载数据并分发数据至每个 worker 节点,并执行多轮迭代。
在每轮迭代中,worker 负责:
- 初始化:载入数据并将全部模型参数从 server 节点拉下来(图 1 绿色)
- 梯度计算:利用该节点的数据计算梯度(图 1 红色)并将梯度更新到 server 节点(图 1 蓝色)
Server 负责:
- 汇总梯度
- 更新参数
使用¶
DataParallel 使用起来非常方便,我们只需要用 DataParallel 包装模型,再设置一些参数即可。主要使用到了nn.DataParallel函数,需要定义的参数包括:参与训练的 GPU 有哪些,device_ids=gpus;用于汇总梯度的 GPU 是哪个,output_device=gpus[0] 。DataParallel 会自动帮我们将数据切分 load 到相应 GPU,将模型复制到相应 GPU,进行正向传播计算梯度并汇总:
model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])
指定 GPU 进行并行训练,一般有两种方式
nn.DataParallel函数传入device_ids参数,可以指定了使用的 GPU 编号
python
model = nn.DataParallel(model, device_ids=[0,1]) # 使用第0和第1张卡进行并行训练
- 要手动指定对程序可见的 GPU 设备
python
os.environ["CUDA_VISIBLE_DEVICES"] = "1,2"
值得注意的是,模型和数据都需要先 load 进 GPU 中,DataParallel 的 module 才能对其进行处理,否则会报错:
# 这里要 model.cuda()
model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
# 这里要 images/target.cuda()
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()
汇总一下,DataParallel 并行训练部分主要与如下代码段有关:
# main.py
import torch
import torch.distributed as dist
gpus = [0, 1, 2, 3]
torch.cuda.set_device('cuda:{}'.format(gpus[0]))
train_dataset = ...
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=...)
model = ...
model = nn.DataParallel(model.to(device), device_ids=gpus, output_device=gpus[0])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()
在使用时,使用 python 执行即可:
python main.py
在 ImageNet 上的完整训练代码,请点击Github。
实现¶
这一节主要讨论 DP 的实现,首先先贴上源码(顺便看一下comment 部分):
class DataParallel(Module):
def __init__(self, module, device_ids=None, output_device=None, dim=0):
super(DataParallel, self).__init__()
# 检查是否有可用的 GPU
device_type = _get_available_device_type()
if device_type is None:
self.module = module
self.device_ids = []
return
# 默认使用所有可见的 GPU
if device_ids is None:
device_ids = _get_all_device_indices()
# 默认 server 是 device_ids 列表上第一个
if output_device is None:
output_device = device_ids[0]
self.dim = dim
self.module = module
self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))
self.output_device = _get_device_index(output_device, True)
self.src_device_obj = torch.device(device_type, self.device_ids[0])
# 检查负载是否平衡, 不平衡(指内存或者处理器 max/min > 0.75 会有警告)
_check_balance(self.device_ids)
# 单卡
if len(self.device_ids) == 1:
self.module.to(self.src_device_obj)
def forward(self, *inputs, **kwargs):
# 没 GPU 可用
if not self.device_ids:
return self.module(*inputs, **kwargs)
# 运行前 GPU device_ids[0] (即我们的 server )上必须有 parallelized module 的parameters 和 buffers
# 因为 DP 保证 GPU device_ids[0] 和 base parallelized module 共享存储
# 所以在device[0] 上的 in-place 更新也会被保留下来,其他的则不会
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))
# nice 现在 device[0] 上已经有了 module 和 input, 接下来我们就要开始 PS 算法了
# 可以开始看正文了
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# 如果仅有单卡可用,直接单卡计算,不用并行
if len(self.device_ids) == 1:
return self.module(*inputs[0], **kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
outputs = self.parallel_apply(replicas, inputs, kwargs)
return self.gather(outputs, self.output_device)
def replicate(self, module, device_ids):
return replicate(module, device_ids, not torch.is_grad_enabled())
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)
从 forward 函数可以看出,关键函数有 scatter, replicate, parallel_apply 和 gather,我们一个一个看一下。
首先是 scatter 函数,即 scatter_kwargs 函数。
def scatter_kwargs(inputs, kwargs, target_gpus, dim=0):
r"""Scatter with support for kwargs dictionary"""
# 主要函数
inputs = scatter(inputs, target_gpus, dim) if inputs else []
kwargs = scatter(kwargs, target_gpus, dim) if kwargs else []
# 用空项补全使 inputs 和 kwargs 长度相当
if len(inputs) < len(kwargs):
inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
elif len(kwargs) < len(inputs):
kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
# 返回 tuple
inputs = tuple(inputs)
kwargs = tuple(kwargs)
return inputs, kwargs
scatter_kwargs 函数中最重要的就是 scatter 函数,负责将 tensor 分成大概相等的块并将他们分给不同的 GPU。对其他的数据类型,则是复制分散给不同的 GPU 。
def scatter(inputs, target_gpus, dim=0):
r"""
Slices tensors into approximately equal chunks and
distributes them across given GPUs. Duplicates
references to objects that are not tensors.
"""
def scatter_map(obj):
if isinstance(obj, torch.Tensor):
return Scatter.apply(target_gpus, None, dim, obj)
if is_namedtuple(obj):
return [type(obj)(*args) for args in zip(*map(scatter_map, obj))]
if isinstance(obj, tuple) and len(obj) > 0:
return list(zip(*map(scatter_map, obj)))
if isinstance(obj, list) and len(obj) > 0:
return [list(i) for i in zip(*map(scatter_map, obj))]
if isinstance(obj, dict) and len(obj) > 0:
return [type(obj)(i) for i in zip(*map(scatter_map, obj.items()))]
return [obj for targets in target_gpus]
# After scatter_map is called, a scatter_map cell will exist. This cell
# has a reference to the actual function scatter_map, which has references
# to a closure that has a reference to the scatter_map cell (because the
# fn is recursive). To avoid this reference cycle, we set the function to
# None, clearing the cell
try:
res = scatter_map(inputs)
finally:
scatter_map = None
return res
其中,针对 tensor 的函数,
class Scatter(Function):
@staticmethod
def forward(ctx, target_gpus, chunk_sizes, dim, input):
target_gpus = [_get_device_index(x, True) for x in target_gpus]
ctx.dim = dim
ctx.input_device = input.get_device() if input.device.type != "cpu" else -1
streams = None
if torch.cuda.is_available() and ctx.input_device == -1:
# Perform CPU to GPU copies in a background stream
# 新建 cuda stream
streams = [_get_stream(device) for device in target_gpus]
# 真正的操作
outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)
# Synchronize with the copy stream
if streams is not None:
for i, output in enumerate(outputs):
with torch.cuda.device(target_gpus[i]):
main_stream = torch.cuda.current_stream()
main_stream.wait_stream(streams[i])
output.record_stream(main_stream)
return outputs
@staticmethod
def backward(ctx, *grad_output):
return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)
comm.scatter 依赖于 C++,就不介绍了。
回顾 DP 代码块,我们已经运行完 scatter函数,即将一个 batch 近似等分成更小的 batch。接下来我们要看 replicate 函数和 gather 函数 (假设我们有不少于两张卡)。
# DP forward 里的代码
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
# 实现
def replicate(network, devices, detach=False):
if not _replicatable_module(network):
raise RuntimeError("Cannot replicate network where python modules are "
"childrens of ScriptModule")
if not devices:
return []
# 需要复制到哪些 GPU, 复制多少份
devices = [_get_device_index(x, True) for x in devices]
num_replicas = len(devices)
# 复制 parameters
params = list(network.parameters())
param_indices = {param: idx for idx, param in enumerate(params)}
# 拉到代码块底部看原函数,然后再回来
param_copies = _broadcast_coalesced_reshape(params, devices, detach)
# 复制 buffers
buffers = list(network.buffers())
buffers_rg = []
buffers_not_rg = []
for buf in buffers:
if buf.requires_grad and not detach:
buffers_rg.append(buf)
else:
buffers_not_rg.append(buf)
# 记录需要和不需要求导的 buffer 的 index
buffer_indices_rg = {buf: idx for idx, buf in enumerate(buffers_rg)}
buffer_indices_not_rg = {buf: idx for idx, buf in enumerate(buffers_not_rg)}
# 分别拷贝,这个咱们已经会了
buffer_copies_rg = _broadcast_coalesced_reshape(buffers_rg, devices, detach=detach)
buffer_copies_not_rg = _broadcast_coalesced_reshape(buffers_not_rg, devices, detach=True)
# 现在开始拷贝网络
# 准备过程:将 network.modules() 变成list
# 然后再为之后复制的模型准备好空的 list 和 indices
modules = list(network.modules())
module_copies = [[] for device in devices]
module_indices = {}
scriptmodule_skip_attr = {"_parameters", "_buffers", "_modules", "forward", "_c"}
for i, module in enumerate(modules):
module_indices[module] = i
for j in range(num_replicas):
replica = module._replicate_for_data_parallel()
# This is a temporary fix for DDP. DDP needs to access the
# replicated model parameters. It used to do so through
# `mode.parameters()`. The fix added in #33907 for DP stops the
# `parameters()` API from exposing the replicated parameters.
# Hence, we add a `_former_parameters` dict here to support DDP.
replica._former_parameters = OrderedDict()
module_copies[j].append(replica)
# 接下来分别复制 module,param,buffer
for i, module in enumerate(modules):
for key, child in module._modules.items():
if child is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._modules[key] = None
else:
module_idx = module_indices[child]
for j in range(num_replicas):
replica = module_copies[j][i]
setattr(replica, key, module_copies[j][module_idx])
for key, param in module._parameters.items():
if param is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._parameters[key] = None
else:
param_idx = param_indices[param]
for j in range(num_replicas):
replica = module_copies[j][i]
param = param_copies[j][param_idx]
# parameters in replicas are no longer leaves,
# so setattr them as non-parameter attributes
setattr(replica, key, param)
# expose the parameter for DDP
replica._former_parameters[key] = param
for key, buf in module._buffers.items():
if buf is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._buffers[key] = None
else:
if buf.requires_grad and not detach:
buffer_copies = buffer_copies_rg
buffer_idx = buffer_indices_rg[buf]
else:
buffer_copies = buffer_copies_not_rg
buffer_idx = buffer_indices_not_rg[buf]
for j in range(num_replicas):
replica = module_copies[j][i]
setattr(replica, key, buffer_copies[j][buffer_idx])
return [module_copies[j][0] for j in range(num_replicas)]
# !!!从replicate来看这里
def _broadcast_coalesced_reshape(tensors, devices, detach=False):
from ._functions import Broadcast
# 先看 else 的 comment,因为不 detach 也会用到同样的函数
if detach:
return comm.broadcast_coalesced(tensors, devices)
else:
# Use the autograd function to broadcast if not detach
if len(tensors) > 0:
# 下拉看源码
tensor_copies = Broadcast.apply(devices, *tensors)
return [tensor_copies[i:i + len(tensors)]
for i in range(0, len(tensor_copies), len(tensors))]
else:
return []
# Broadcast.apply
class Broadcast(Function):
@staticmethod
def forward(ctx, target_gpus, *inputs):
assert all(i.device.type != 'cpu' for i in inputs), (
'Broadcast function not implemented for CPU tensors'
)
target_gpus = [_get_device_index(x, True) for x in target_gpus]
ctx.target_gpus = target_gpus
if len(inputs) == 0:
return tuple()
ctx.num_inputs = len(inputs)
# input 放在 device[0]
ctx.input_device = inputs[0].get_device()
# 和 detach 的情况一样
outputs = comm.broadcast_coalesced(inputs, ctx.target_gpus)
# comm.broadcast_coalesced 的代码
# tensors 必须在同一个设备,CPU 或者 GPU; devices 即是要拷贝到的设备;buffer_size 则是最大的buffer
# 这里用到 buffer 将小张量合并到缓冲区以减少同步次数
# def broadcast_coalesced(tensors, devices, buffer_size=10485760):
# devices = [_get_device_index(d) for d in devices]
# return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
non_differentiables = []
for idx, input_requires_grad in enumerate(ctx.needs_input_grad[1:]):
if not input_requires_grad:
for output in outputs:
non_differentiables.append(output[idx])
ctx.mark_non_differentiable(*non_differentiables)
return tuple([t for tensors in outputs for t in tensors])
@staticmethod
def backward(ctx, *grad_outputs):
return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)
下面继续 parallel_apply 部分。⚠️ DP 和 DDP 共用 parallel_apply 代码
# DP 代码
outputs = self.parallel_apply(replicas, inputs, kwargs)
# threading 实现,用前面准备好的 replica 和输入数据,然后
# for 循环启动多线程
# 源码
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
# 每个 GPU 都有模型和输入
assert len(modules) == len(inputs)
# 确保每个 GPU 都有相应的数据,如没有就空白补全
if kwargs_tup is not None:
# 咱们在 scatter 已经补全了
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = [_get_device_index(x, True) for x in devices]
# 多线程实现
lock = threading.Lock()
results = {}
grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled()
# 定义 worker
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device), autocast(enabled=autocast_enabled):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
# 并行计算得到输出
results[i] = output
except Exception:
with lock:
results[i] = ExceptionWrapper(
where="in replica {} on device {}".format(i, device))
if len(modules) > 1:
# 如有一个进程控制多个 GPU ,起多个线程
# 需要强调一下,虽然 DDP 推荐单卡单进程,即每次调用 DDP device_ids 都只输入一张卡的 id(通常是 args.local_rank),但是如果输入多个 device_id,此时 DDP 就是单进程多线程控制多卡,和 DP 一样,关于 DDP 的解读可以看下文
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
# 一个 GPU 一个进程 ( DDP 推荐操作)
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs = []
for i in range(len(inputs)):
output = results[i]
# error handle
if isinstance(output, ExceptionWrapper):
output.reraise()
outputs.append(output)
# 输出 n 个计算结果
return outputs
现在我们已经得到并行计算的结果了,接下来我们要将结果收集到 device[0]。
# DP 代码
return self.gather(outputs, self.output_device)
# 收集到 devices[0]
# 源码
def gather(outputs, target_device, dim=0):
r"""
Gathers tensors from different GPUs on a specified device
(-1 means the CPU).
"""
def gather_map(outputs):
out = outputs[0]
if isinstance(out, torch.Tensor):
return Gather.apply(target_device, dim, *outputs)
if out is None:
return None
if isinstance(out, dict):
if not all((len(out) == len(d) for d in outputs)):
raise ValueError('All dicts must have the same number of keys')
return type(out)(((k, gather_map([d[k] for d in outputs]))
for k in out))
return type(out)(map(gather_map, zip(*outputs)))
# Recursive function calls like this create reference cycles.
# Setting the function to None clears the refcycle.
try:
res = gather_map(outputs)
finally:
gather_map = None
return res
# Gather 源码
class Gather(Function):
@staticmethod
def forward(ctx, target_device, dim, *inputs):
assert all(i.device.type != 'cpu' for i in inputs), (
'Gather function not implemented for CPU tensors'
)
target_device = _get_device_index(target_device, True)
ctx.target_device = target_device
ctx.dim = dim
ctx.input_gpus = tuple(i.get_device() for i in inputs)
if all(t.dim() == 0 for t in inputs) and dim == 0:
inputs = tuple(t.view(1) for t in inputs)
warnings.warn('Was asked to gather along dimension 0, but all '
'input tensors were scalars; will instead unsqueeze '
'and return a vector.')
ctx.unsqueezed_scalar = True
else:
ctx.unsqueezed_scalar = False
ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs)
return comm.gather(inputs, ctx.dim, ctx.target_device)
@staticmethod
def backward(ctx, grad_output):
scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)
if ctx.unsqueezed_scalar:
scattered_grads = tuple(g[0] for g in scattered_grads)
return (None, None) + scattered_grads
# comm.gather 涉及到 C++,具体实现咱也不讲了 ;)
# Gathers tensors from multiple GPU devices.
def gather(tensors, dim=0, destination=None, *, out=None):
tensors = [_handle_complex(t) for t in tensors]
if out is None:
if destination == -1:
warnings.warn(
'Using -1 to represent CPU tensor is deprecated. Please use a '
'device object or string instead, e.g., "cpu".')
destination = _get_device_index(destination, allow_cpu=True, optional=True)
return torch._C._gather(tensors, dim, destination)
else:
if destination is not None:
raise RuntimeError(
"'destination' must not be specified when 'out' is specified, but "
"got destination={}".format(destination))
return torch._C._gather_out(tensors, out, dim)
因为大量实现都依赖 C++,这篇笔记就不涉及了。
最后,用一张图形象地看一下 DP Module 究竟怎么执行的,具体看第一行和第三行:
前向传播的时候我们会先用 Scatter 函数将数据从 device[0] 分配并复制到不同的卡,之后用 Replicate 函数将模型从 device[0] 复制到不同的卡,之后各个卡都有了同样的模型和不同的数据,分别调用 forward 计算损失和梯度。
反向传播的时候,我们会将梯度收集到 device[0] 然后在 device[0] 更新参数。

分析¶
- 负载不均衡
device[0] 负载大一些
- 通信开销
假设有 k 个 GPU, 完成一次通信需要时间 \frac{p}{b} ,那么使用 PS 算法,总共需要花费时间 T = 2(k-1)\frac{p}{b}
- 单进程
The difference between DistributedDataParallel and DataParallel
is:DistributedDataParallel uses multiprocessing where a process is created for each GPU, while DataParallel uses multithreading. By using multiprocessing, each GPU has its dedicated process, this avoids the performance overhead caused by GIL of Python interpreter.
- Global Interpreter Lock (GIL) 全局解释器锁,简单来说就是,一个 Python 进程只能利用一个 CPU kernel,即单核多线程并发时,只能执行一个线程。考虑多核,多核多线程可能出现线程颠簸 (thrashing) 造成资源浪费,所以 Python 想要利用多核最好是多进程。
评论