分布式 evaluation

all_reduce, barrier 等 API 是 distributed 中更为基础和底层的 API。这些 API 可以帮助我们控制进程之间的交互,控制 GPU 数据的传输。在自定义 GPU 协作逻辑,汇总 GPU 间少量的统计信息时,大有用处。熟练掌握这些 API 也可以帮助我们自己设计、优化分布式训练、测试流程。

到目前为止,Distributed Sampler 能够帮助我们分发数据,DistributedDataParallel、hvd.broadcast_parameters 能够帮助我们分发模型,并在框架的支持下解决梯度汇总和参数更新的问题。然而,还有一些同学还有这样的疑惑,

  1. 训练样本被切分成了若干个部分,被若干个进程分别控制运行在若干个 GPU 上,如何在进程间进行通信汇总这些(GPU 上的)信息?
  2. 使用一张卡进行推理、测试太慢了,如何使用 Distributed 进行分布式地推理和测试,并将结果汇总在一起?
  3. ……

要解决这些问题,我们缺少一个更为基础的 API,汇总记录不同 GPU 上生成的准确率、损失函数等指标信息。这个 API 就是 torch.distributed.all_reduce。示意图如下:

具体来说,它的工作过程包含以下三步:

  1. 通过调用 all_reduce(tensor, op=...),当前进程会向其他进程发送 tensor(例如 rank 0 会发送 rank 0 的 tensor 到 rank 1、2、3)
  2. 接受其他进程发来的 tensor(例如 rank 0 会接收 rank 1 的 tensor、rank 2 的 tensor、rank 3 的 tensor)。
  3. 在全部接收完成后,当前进程(例如 rank 0)会对当前进程的和接收到的 tensor (例如 rank 0 的 tensor、rank 1 的 tensor、rank 2 的 tensor、rank 3 的 tensor)进行 op (例如求和)操作。

使用 torch.distributed.all_reduce(loss, op=torch.distributed.reduce_op.SUM),我们就能够对不数据切片(不同 GPU 上的训练数据)的损失函数进行求和了。接着,我们只要再将其除以进程(GPU)数量 world_size就可以得到损失函数的平均值。

正确率也能够通过同样方法进行计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 原始代码
output = model(images)
loss = criterion(output, target)

acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1.item(), images.size(0))
top5.update(acc5.item(), images.size(0))

# 修改后,同步各 GPU 中数据切片的统计信息,用于分布式的 evaluation
def reduce_tensor(tensor):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.reduce_op.SUM)
rt /= args.world_size
return rt

output = model(images)
loss = criterion(output, target)
acc1, acc5 = accuracy(output, target, topk=(1, 5))

torch.distributed.barrier()

reduced_loss = reduce_tensor(loss.data)
reduced_acc1 = reduce_tensor(acc1)
reduced_acc5 = reduce_tensor(acc5)

losses.update(loss.item(), images.size(0))
top1.update(acc1.item(), images.size(0))
top5.update(acc5.item(), images.size(0))

值得注意的是,为了同步各进程的计算进度,我们在 reduce 之前插入了一个同步 API torch.distributed.barrier()。在所有进程运行到这一步之前,先完成此前代码的进程会等待其他进程。这使得我们能够得到准确、有序的输出。在 Horovod 中,我们无法使用 torch.distributed.barrier(),取而代之的是,我们可以在 allreduce 过程中指明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def reduce_mean(tensor, world_size):
rt = tensor.clone()
hvd.allreduce(rt, name='barrier')
rt /= world_size
return rt

output = model(images)
loss = criterion(output, target)
acc1, acc5 = accuracy(output, target, topk=(1, 5))

reduced_loss = reduce_tensor(loss.data)
reduced_acc1 = reduce_tensor(acc1)
reduced_acc5 = reduce_tensor(acc5)

losses.update(loss.item(), images.size(0))
top1.update(acc1.item(), images.size(0))
top5.update(acc5.item(), images.size(0))