Pytorch整体框架 代码层面
Pytorch VS Tensorflow
Pytorch | Tensorflow | 说明 |
---|---|---|
动态框架 | 1.x版本静态框架 | Tensorflow的计算图构建好后,不能够灵活改变,2.0默认使用动态图 |
简洁高效,api易于使用 | 系统设计过于复杂 | |
调试简单,可以使用pdb,可以设置断点 | 相对不容易,一是从会话中请求你想检查的变量,二是使用 TensorFlow 调试器(tfdbg) | |
设备管理需明确指定,比如启用CUDA后将张量移到GPU上 | 无缝性能,默认设置。如果存在可用的GPU,就在GPU上运行 | |
简单地为每个 CPU 和 GPU 版本写一个接口和对应实现即可 | 需要更多样板代码 |
Pytorch分析
特点介绍
自动梯度autograd:可以在声明张量的时候,决定该变量是否自动计算梯度。
数据加载和预处理:PyTorch提供了一些可极大简化和加快数据处理流程的工具,对于常用的数据集,PyTorch也提供了封装好的接口供用户快速调用。
内核机制
张量:包含同一种数据类型的多维矩阵。PyTorch 的接口是 Python,但底层主要都是用 C++实现的,集成 C++代码通常被称为「扩展」。
为了定义 C/C++中一个新的 Python 对象类型,你需要定义如下 THPVariable 类似结构。其中第一个 PyObject_HEAD 宏旨在标准化 Python 对象,并扩展至另一个结构,该结构包含一个指向类型对象的指针,以及一个带有引用计数(ref count)的字段。
当调用torch.mm时,会发生两次调度:
第一次调度基于设备类型和张量布局:比如是 CPU 张量还是 CUDA张量,是有步幅的张量还是稀疏的张量。这里需要做一次调度应该是合理的:CPU 矩阵乘法的实现非常不同于 CUDA 的实现。
第二次调度是在所涉 dtype 上的调度。这个调度只是一个简单的 switch 语句,针对的是核选择支持的任意 dtype。这里需要调度的原因也很合理:CPU 代码(或 CUDA 代码)是基于 float 实现乘法,这不同于用于 int 的代码。这说明你需要为每种 dtype 都使用不同的核。
张量存储:张量的实际原始数据并不是立即保存在张量结构中,而是保存在我们称之为「存储(Storage)」的地方,它是张量结构的一部分。
一般张量存储可以通过 Allocator 选择是储存在计算机内存(CPU)还是显存(GPU)。
Numpy到Pytorch中tensor的转换 ZERO-COPYING TENSORS
使用torch.from_numpy()函数进行操作
Zero-Copying 的形式确实能省很多内存,但是原地(np_array += 1.0)和标准运算(np_array = np_array + 1.0)之间的区别会有点模糊
扩展torch.autograd
编译执行-JIT
torch.jit,它是一组编译工具,主要过程如下
源码结构分析
- torch/ 包含导入和使用的实际的python模块,python代码而且易于操作
- torch/csrc/ C++代码,它实现了在 Python 和 C++ 间转换的绑定代码
- aten/ C++ tensor library for PyTorch (no autograd support),包含原生算子和传统算子
- c10/ 包含 PyTorch 的核心抽象,包括张量和存储数据结构的实际实现。
参考来源
https://blog.csdn.net/iodjSVf8U1J7KYc/article/details/90745954
https://www.reddit.com/r/MachineLearning/comments/avfoso/p_pytorch_under_the_hood/
Pytorch分布式训练
batch_time | train_time1 | train_time2 | train_time3 | acc | |
---|---|---|---|---|---|
single_gpu | 0.168 | 133 | 132 | 132 | 74.092 |
data_parallel | 0.279 | 235 | 218 | 226 | 73.899 |
distributed-2 | 0.19 | 83 | 84 | 85 | 78.317 |
distributed-3 | 0.199 | 60 | 59 | 61 | 78.949 |
Mul-node-2 | 0.9 | 370 | 379 | 377 | 79.060 |
Pytorch分布式源码分析
\torch\distributed\distributed_c10d.py
代码量近2000行,python语言,主要是group粒度的一些操作。
# 对参数进行检查处理调用helper函数,最终通过ProcessGroupGloo,ProcessGroupMPI等创建
def init_process_group(backend,
init_method=None,
timeout=default_pg_timeout,
world_size=-1,
rank=-1,
store=None,
group_name=''):
def destroy_process_group(group=group.WORLD):
- 进程组的创建、销毁、操作函数,比如get_rank, get_world_size
- tensor的同步/非同步send和recv、broadcast、reduce、gather等操作
def broadcast(tensor,
src,
group=group.WORLD,
async_op=False):
"""
Broadcasts the tensor to the whole group.
``tensor`` must have the same number of elements in all processes
participating in the collective.
Arguments:
tensor (Tensor): Data to be sent if ``src`` is the rank of current
process, and tensor to be used to save received data otherwise.
src (int): Source rank.
group (ProcessGroup, optional): The process group to work on
async_op (bool, optional): Whether this op should be an async op
Returns:
Async work handle, if async_op is set to True.
None, if not async_op or if not part of the group
"""
_check_single_tensor(tensor, "tensor")
if _rank_not_in_group(group):
return
opts = BroadcastOptions()
opts.rootRank = src
opts.rootTensor = 0
if group == GroupMember.WORLD:
_check_default_pg()
work = _default_pg.broadcast([tensor], opts)
else:
group_src_rank = _get_group_rank(group, src)
opts.rootRank = group_src_rank
work = group.broadcast([tensor], opts)
if async_op:
return work
else:
work.wait()
-
object和tensor的转换
-
面向object的gather
torch\lib\c10d
C++文件,主要为创建进程组的底层实现。
- ProcessGroupGloo
- ProcessGroupMPI
- ProcessGroupNCCL
- Store
torch\nn\parallel\distributed.py
模型并行化,近800行代码
class DistributedDataParallel(Module):
def __init__(self, module, device_ids=None,
output_device=None, dim=0, broadcast_buffers=True,
process_group=None,
bucket_cap_mb=25,
find_unused_parameters=False,
check_reduction=False):
def forward(self, *inputs, **kwargs):
def scatter(self, inputs, kwargs, device_ids):
def gather(self, outputs, output_device):
def train(self, mode=True):
torch\nn\parallel\replicate.py
模型复制的主要操作,代码量160行左右。
def replicate(network, devices, detach=False):
torch\nn\parallel\scatter_gather.py
针对不同类型的input进行解析到tensor层,调用_functions.py中的Scatter、Gather进行操作。
torch\nn\parallel_functions.py
-
代码量120行
- 包含四个类:Broadcast、ReduceAddCoalesced、Gather、Scatter
- 对参数进行简单处理后调用comm里的函数进行实际操作
torch\nn\parallel\comm.py
进行devices到index的转换,然后调用torch._C.里的操作。代码量200多行。
torch\utils\data\distributed.py
用于对数据集进行分布式采样,分成不同的subset。
class DistributedSampler(Sampler):
\torch\distributed\launch.py
torch.distributed.launch
is a module that spawns up multiple distributed training processes on each of the training nodes.
对参数进行解析应用到每一个process上。
Tips:
- NCCL backend is the recommended backend to use for GPU training.
代码结构
torch\csrc\distributed\c10d\init.cpp
对Python和C++的交互,比如函数定义、参数类型绑定声明等进行初始化。主要对应\torch\distributed\distributed_c10d.py里的
from . import (
AllreduceOptions,
AllreduceCoalescedOptions,
AllToAllOptions,
BroadcastOptions, # 暂未找到
GatherOptions,
ReduceOptions,
ReduceScatterOptions,
ScatterOptions,
)
py::class_<::c10d::AllreduceOptions>(module, "AllreduceOptions")
.def(py::init<>())
.def_readwrite("reduceOp", &::c10d::AllreduceOptions::reduceOp)
.def_readwrite("timeout", &::c10d::AllreduceOptions::timeout);
\torch\lib\c10d\Types.hpp
包含了对于\torch\distributed\distributed_c10d.py里各种Options的结构定义:
struct AllreduceOptions {
ReduceOp reduceOp = ReduceOp::SUM;
std::chrono::milliseconds timeout = kUnsetTimeout;
};
struct BroadcastOptions {
int rootRank = 0;
int rootTensor = 0;
std::chrono::milliseconds timeout = kUnsetTimeout;
};
\torch\lib\c10d\ProcessGroup.hpp
对ProcessGroup类的定义,以及对于虚函数的声明。
virtual std::shared_ptr<ProcessGroup::Work> broadcast(
std::vector<at::Tensor>& data,
const BroadcastOptions& opts = BroadcastOptions()) = 0;
virtual std::shared_ptr<ProcessGroup::Work> allreduce(
std::vector<at::Tensor>& data,
const AllreduceOptions& opts = AllreduceOptions()) = 0;