Pytorch整体框架 代码层面

Pytorch VS Tensorflow

Pytorch Tensorflow 说明
动态框架 1.x版本静态框架 Tensorflow的计算图构建好后,不能够灵活改变,2.0默认使用动态图
简洁高效,api易于使用 系统设计过于复杂  
调试简单,可以使用pdb,可以设置断点 相对不容易,一是从会话中请求你想检查的变量,二是使用 TensorFlow 调试器(tfdbg)  
设备管理需明确指定,比如启用CUDA后将张量移到GPU上 无缝性能,默认设置。如果存在可用的GPU,就在GPU上运行  
简单地为每个 CPU 和 GPU 版本写一个接口和对应实现即可 需要更多样板代码  

Pytorch分析

特点介绍

自动梯度autograd:可以在声明张量的时候,决定该变量是否自动计算梯度。

数据加载和预处理:PyTorch提供了一些可极大简化和加快数据处理流程的工具,对于常用的数据集,PyTorch也提供了封装好的接口供用户快速调用。

内核机制

张量:包含同一种数据类型的多维矩阵。PyTorch 的接口是 Python,但底层主要都是用 C++实现的,集成 C++代码通常被称为「扩展」。

为了定义 C/C++中一个新的 Python 对象类型,你需要定义如下 THPVariable 类似结构。其中第一个 PyObject_HEAD 宏旨在标准化 Python 对象,并扩展至另一个结构,该结构包含一个指向类型对象的指针,以及一个带有引用计数(ref count)的字段。

image-20200802112447870

当调用torch.mm时,会发生两次调度:

第一次调度基于设备类型和张量布局:比如是 CPU 张量还是 CUDA张量,是有步幅的张量还是稀疏的张量。这里需要做一次调度应该是合理的:CPU 矩阵乘法的实现非常不同于 CUDA 的实现。

第二次调度是在所涉 dtype 上的调度。这个调度只是一个简单的 switch 语句,针对的是核选择支持的任意 dtype。这里需要调度的原因也很合理:CPU 代码(或 CUDA 代码)是基于 float 实现乘法,这不同于用于 int 的代码。这说明你需要为每种 dtype 都使用不同的核。

张量存储:张量的实际原始数据并不是立即保存在张量结构中,而是保存在我们称之为「存储(Storage)」的地方,它是张量结构的一部分。

image-20200802145841616

一般张量存储可以通过 Allocator 选择是储存在计算机内存(CPU)还是显存(GPU)。

image-20200802152504331

Numpy到Pytorch中tensor的转换 ZERO-COPYING TENSORS

image-20200802113043913

使用torch.from_numpy()函数进行操作

image-20200802113545511

Zero-Copying 的形式确实能省很多内存,但是原地(np_array += 1.0)和标准运算(np_array = np_array + 1.0)之间的区别会有点模糊

扩展torch.autograd

image-20200804151130978

编译执行-JIT

torch.jit,它是一组编译工具,主要过程如下

image-20200802160012952

源码结构分析

参考来源

https://blog.csdn.net/iodjSVf8U1J7KYc/article/details/90745954

https://www.reddit.com/r/MachineLearning/comments/avfoso/p_pytorch_under_the_hood/

Pytorch分布式训练

  batch_time train_time1 train_time2 train_time3 acc
single_gpu 0.168 133 132 132 74.092
data_parallel 0.279 235 218 226 73.899
distributed-2 0.19 83 84 85 78.317
distributed-3 0.199 60 59 61 78.949
Mul-node-2 0.9 370 379 377 79.060

Pytorch分布式源码分析

\torch\distributed\distributed_c10d.py

代码量近2000行,python语言,主要是group粒度的一些操作。

# 对参数进行检查处理调用helper函数,最终通过ProcessGroupGloo,ProcessGroupMPI等创建
def init_process_group(backend,
                       init_method=None,
                       timeout=default_pg_timeout,
                       world_size=-1,
                       rank=-1,
                       store=None,
                       group_name=''):
def destroy_process_group(group=group.WORLD):
def broadcast(tensor,
              src,
              group=group.WORLD,
              async_op=False):
    """
    Broadcasts the tensor to the whole group.

    ``tensor`` must have the same number of elements in all processes
    participating in the collective.

    Arguments:
        tensor (Tensor): Data to be sent if ``src`` is the rank of current
            process, and tensor to be used to save received data otherwise.
        src (int): Source rank.
        group (ProcessGroup, optional): The process group to work on
        async_op (bool, optional): Whether this op should be an async op

    Returns:
        Async work handle, if async_op is set to True.
        None, if not async_op or if not part of the group

    """
    _check_single_tensor(tensor, "tensor")
    if _rank_not_in_group(group):
        return

    opts = BroadcastOptions()
    opts.rootRank = src
    opts.rootTensor = 0

    if group == GroupMember.WORLD:
        _check_default_pg()
        work = _default_pg.broadcast([tensor], opts)
    else:
        group_src_rank = _get_group_rank(group, src)
        opts.rootRank = group_src_rank
        work = group.broadcast([tensor], opts)
    if async_op:
        return work
    else:
        work.wait()

torch\lib\c10d

C++文件,主要为创建进程组的底层实现。

torch\nn\parallel\distributed.py

模型并行化,近800行代码

class DistributedDataParallel(Module):
    def __init__(self, module, device_ids=None,
                 output_device=None, dim=0, broadcast_buffers=True,
                 process_group=None,
                 bucket_cap_mb=25,
                 find_unused_parameters=False,
                 check_reduction=False):
    def forward(self, *inputs, **kwargs):
    def scatter(self, inputs, kwargs, device_ids):
    def gather(self, outputs, output_device):
    def train(self, mode=True):

torch\nn\parallel\replicate.py

模型复制的主要操作,代码量160行左右。

def replicate(network, devices, detach=False):

torch\nn\parallel\scatter_gather.py

针对不同类型的input进行解析到tensor层,调用_functions.py中的Scatter、Gather进行操作。

torch\nn\parallel_functions.py

torch\nn\parallel\comm.py

进行devices到index的转换,然后调用torch._C.里的操作。代码量200多行。

torch\utils\data\distributed.py

用于对数据集进行分布式采样,分成不同的subset。

class DistributedSampler(Sampler):

\torch\distributed\launch.py

torch.distributed.launch is a module that spawns up multiple distributed training processes on each of the training nodes.

对参数进行解析应用到每一个process上。

Tips:

  1. NCCL backend is the recommended backend to use for GPU training.

代码结构

torch\csrc\distributed\c10d\init.cpp

对Python和C++的交互,比如函数定义、参数类型绑定声明等进行初始化。主要对应\torch\distributed\distributed_c10d.py里的

from . import (
    AllreduceOptions,
    AllreduceCoalescedOptions,
    AllToAllOptions,
    BroadcastOptions,   # 暂未找到
    GatherOptions,
    ReduceOptions,
    ReduceScatterOptions,
    ScatterOptions,
)
py::class_<::c10d::AllreduceOptions>(module, "AllreduceOptions")
      .def(py::init<>())
      .def_readwrite("reduceOp", &::c10d::AllreduceOptions::reduceOp)
      .def_readwrite("timeout", &::c10d::AllreduceOptions::timeout);

\torch\lib\c10d\Types.hpp

包含了对于\torch\distributed\distributed_c10d.py里各种Options的结构定义:

struct AllreduceOptions {
  ReduceOp reduceOp = ReduceOp::SUM;
  std::chrono::milliseconds timeout = kUnsetTimeout;
};
struct BroadcastOptions {
  int rootRank = 0;
  int rootTensor = 0;
  std::chrono::milliseconds timeout = kUnsetTimeout;
};

\torch\lib\c10d\ProcessGroup.hpp

对ProcessGroup类的定义,以及对于虚函数的声明。

virtual std::shared_ptr<ProcessGroup::Work> broadcast(
      std::vector<at::Tensor>& data,
      const BroadcastOptions& opts = BroadcastOptions()) = 0;

  virtual std::shared_ptr<ProcessGroup::Work> allreduce(
      std::vector<at::Tensor>& data,
      const AllreduceOptions& opts = AllreduceOptions()) = 0;