GPU 集群上的分布式
Slurm,是一个用于 Linux 系统的免费、开源的任务调度工具。它提供了三个关键功能。第一,为用户分配资源(计算机节点),以供用户执行工作。第二,它提供了一个框架,用于执行在节点上运行着的任务(通常是并行的任务),第三,为任务队列合理地分配资源。如果你还没有部署 Slurm 可以按照作者总结的部署教程 进行部署。
通过运行 slurm 的控制命令,slurm 会将写好的 python 程序在每个节点上分别执行,调用节点上定义的 GPU 资源进行运算。要编写能被 Slurm 在 GPU 集群上执行的 python 分布式训练程序,我们只需要对上文中多进程的 DistributedDataParallel 代码进行修改,告诉每一个执行的任务(每个节点上的 python 程序),要用哪些训练哪一部分数据,反向传播的结果如何合并就可以了。
我们首先需要获得每个任务(对应每个节点)的基本信息,以便针对任务的基本信息处理其应当负责的数据。在使用 slurm 执行 srun python 代码时,python 可以从环境变量 os.environ 中获取当前 python 进程的基本信息:
1 2 3 4 import os local_rank = os.environ['SLURM_PROCID' ] world_size = os.environ['SLURM_NPROCS' ] job_id = os.environ['SLURM_JOBID' ]
在每个任务(节点)中,我们需要为节点中的每个 GPU 资源分配一个进程,管理该 GPU 应当处理的数据。
当前节点的 GPU 的数量可以由 torch.cuda 查询得到:
1 ngpus_per_node = torch.cuda.device_count()
接着,与上文相似,我们使用 torch.multiprocessing 创建 ngpus_per_node 个进程,其中,每个进程执行的函数为 main_worker ,该函数调用所需要的由 args 传入:
1 mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))
在编写 main_worker 时,我们首先需要解决的问题是:不同节点、或者同一节点间的不同进程之间需要通信来实现数据的分割、参数的合并。我们可以使用 pytorch 的 dist 库在共享文件系统上创建一个文件进行通信:
1 2 3 4 5 6 7 import torch.distributed as distdef main_worker (gpu, ngpus_per_node, args ): dist_url = "file://dist_file.{}" .format (job_id) rank = local_rank * ngpus_per_node + gpu dist.init_process_group(backend='nccl' , init_method=dist_url, world_size=world_size, rank=rank) ...
完成进程创建和通信后,下一步就是实现我们常用的 pipline 了,即加载模型、加载数据、正向传播、反向传播。与上文相似,这里,我们把模型加载进当前进程所对应的 GPU 中:
1 2 3 4 5 6 7 8 def main_worker (gpu, ngpus_per_node, args ): dist_url = "file://dist_file.{}" .format (job_id) rank = local_rank * ngpus_per_node + gpu dist.init_process_group(backend='nccl' , init_method=dist_url, world_size=world_size, rank=rank) ... torch.cuda.set_device(gpu) model.cuda(gpu) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
接着,把当前进程对应的数据段采样出来,也加载到对应的 GPU 中。同样可以使用 pytorch 的 dist 库实现这个采样过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def main_worker (gpu, ngpus_per_node, args ): dist_url = "file://dist_file.{}" .format (job_id) rank = local_rank * ngpus_per_node + gpu dist.init_process_group(backend='nccl' , init_method=dist_url, world_size=world_size, rank=rank) ... torch.cuda.set_device(gpu) model.cuda(gpu) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ... train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size, num_workers=2 , pin_memory=True , sampler=train_sampler) for i, (images, target) in enumerate (train_loader): images = images.cuda(gpu, non_blocking=True ) target = target.cuda(gpu, non_blocking=True )
最后,进行正常的正向和反向传播:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def main_worker (gpu, ngpus_per_node, args ): dist_url = "file://dist_file.{}" .format (job_id) rank = local_rank * ngpus_per_node + gpu dist.init_process_group(backend='nccl' , init_method=dist_url, world_size=world_size, rank=rank) ... torch.cuda.set_device(gpu) model.cuda(gpu) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) ... train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size, num_workers=2 , pin_memory=True , sampler=train_sampler) for i, (images, target) in enumerate (train_loader): images = images.cuda(gpu, non_blocking=True ) target = target.cuda(gpu, non_blocking=True ) ... output = model(images) loss = criterion(output, target) optimizer.zero_grad() loss.backward() optimizer.step()
在使用时,调用 srun 启动任务:
1 srun -N2 --gres gpu:1 python distributed_slurm_main.py --dist-file dist_file
在 ImageNet 上的完整训练代码,请点击Github 。