Skip to content

Distributed Block Design #11609

@velconia

Description

@velconia

Distributed Block Design

背景

在开始介绍我们的 Distributed Block Design 之前, 我们先看一下, 我们之前都是如何构造一个分布式的NN训练网络的

1. 构造 Forward Pass

参考 uci_housing 的例子, 我们构造了一个非常简单的 Forward Pass:

import paddle.fluid as fluid
def paddle_forward_pass():
    x = fluid.layers.data(name = "x", shape=[13])
    y = fluid.layers.fc(input=x, size=1)
    return y

y = paddle_forward_pass()
print fluid.default_main_program()

通过上述代码, 我们得到了如下的 ProgramDesc:

#  message ProgramDesc {
#    block[0] = Block {                      
#      vars = [x, w, b, y]
#      ops = [                            
#        read() -> x                          
#        mul(x, w) -> tmp
#        add(tmp, b) -> y
#      ]                 
#    }                                                                      
#  }

2. 构造 Backward Pass

在实践中, 我们把复杂的通过 Forward Pass 构造 Backward Pass 的过程用 append_backward 封装起来:

params_and_grads = append_backward(y)

通过构造 Backward Pass, 我们可以得到如下的 ProgramDesc:

#  This was we got now:
#  message ProgramDesc {                     
#    block[0] = Block {   
#      vars = [x, w, b, y, w@GRAD, b@GRAD]
#      ops = [                                
#        read() -> x     
#        mul(x, w) -> tmp
#        add(tmp, b) -> y
#        grad(x, y) -> w@GRAD                                               
#        grad(x, y) -> b@GRAD
#      ]
#    }                                    
#  }

其实就是往后添加了一些新的Operators

3. 添加 Optimizer, 完成整个训练程序

我们添加了一个 Optimizer, 将 Forward Pass 和 Backward Pass 串起来, 得到一个完整的训练程序

append_opt_op(params_and_grads)

现在, 我们得到了如下的 ProgramDesc:

#  This was we got now:
#  message ProgramDesc {  
#    block[0] = Block {                   
#      vars = [x, w, b, y, w@GRAD, b@GRAD, lr]
#      ops = [           
#        read() -> x     
#        mul(x, w) -> tmp
#        add(tmp, b) -> y                                                   
#        grad(x, y) -> w@GRAD
#        grad(x, y) -> b@GRAD
#        opt(b@GRAD, w@GRAD, lr) -> w, x  
#      ]
#    }
#  }

还是简单的添加一个opt_op即可

痛点

目前为止, 一切都还是自然而简单的, 并不复杂, 但是, 当我们引入 Distribute Transpiler 时, 突然发现我们面临了一个大问题, 非常非常多的, 繁琐而没有头绪的步骤开始接踵而至, 我们要做的包括:

  1. 把opt op移入到pserver program里去;
  2. 根据pserver数量对是Grad类型的Variable进行切分, 并且加上split_op;
  3. 对切分过后的Variable加上send_op;
  4. 对是Param类型的Variable进行切分, 并且加上concat_op;
  5. 对切分过后的Variable加上recv_op;
  6. 另外我们还要为pserver program加上listen_and_serv_op;
  7. 最后, 我们还要考虑到所有被opt_op依赖的lr_decay_op, 还有lr_decay_op所引入的所有sub_blocks...
  8. What's more, 以上的任意一步, 只要逻辑发生改变, 我们就得对Transpiler进行大动...

很明显, 这不是一个我们想接受的好方案, 任意的牵扯到transpiler的改动都会为分布式执行带来不稳定的因素;

PlaceableBlock Design

方案设计

为了解决上述问题, 我们需要对Distribute Transpiler能做的操作进行规范化以及简化, 为了规范化和简化Transpiler的操作, 我们将Transpiler能操作的粒度从 op 级别提升到了 block 级别, 例如当用户用 fluid 写出如下代码时:

{ // block 0
  op0;
  op1;
  { // block 1
    op2;
    { // block 2
      op3;
    }
  }
  
  { // block 3
    op4;
  }
  op5;
}

实际上, 所有的 op (op0 到 op5) 都是隐式的被指定运行在本地的 Executor 中的, Executor 会轮询所有的 op, 并调用他们的 Run 方法;

当用户对这段代码进行 distribute transpile 时, 用户其实是在把这段代码拆分到多个 Executor 中, 希望他们并发执行;

而我们的 Distribute Transpiler 不再对 op 和 var 进行分析, 并对程序做出改动, 而是通过对 Block 进行分析, 并对程序做出改动, 例如当我们希望 Block0 被放置在 Executor0 中执行, 而 Block1 被放置在 Executor1 中执行时, 我们的代码就应该被 transpile 成两段:

// Blocks which run in Executor 0: 

{ // block 0
  op0;
  op1;
  
  send_vars_to_executor_1();
  wait_for_executor_1_complete();
  recv_vars_from_executor_1();
  
  { // block 3
    op4;
  }
  op5;
}
// Blocks which run in Executor 1: 

{ // block 0
  listen_and_serv_op; // recv vars from executor 0
  
  { // block 1
    op2;
    { // block 2
      op3;
    }
  }
}

所以为了支持这样分配机制, 我们设计了一种 Placeable Block, 他可以在编译期被显示指定为可分配的, 并且被 Transpiler 分配到某一个具体的 Executor 上运行, 那么加入了这样的 Placeable Block 后, 我们上面 NN 训练网络变成了这样的构造:

Forward Pass 和 Backward Pass 部分代码不变, 依旧放在 Executor 0 中执行:

import paddle.fluid as fluid
def paddle_forward_pass():
    x = fluid.layers.data(name = "x", shape=[13])
    y = fluid.layers.fc(input=x, size=1)
    return y

y = paddle_forward_pass()
print fluid.default_main_program()

params_and_grads = append_backward(y)

修改 optimizer 部分代码, 用 Placeable Block 将其包裹起来:

remote_block = fluid.layers.PlaceableBlock(input=params_and_grads, output=weight, bias)
with remote_block.block():
    append_opt_op(params_and_grads)

于是我们得到了如下的 Program Desc:

#  This was we got now:
#  message ProgramDesc {
#    block[0] = Block {  
#      parent_idx = -1
#      vars = [x, w, b, y, w@GRAD, b@GRAD, lr]
#      ops = [               
#        read() -> x         
#        mul(x, w) -> tmp               
#        add(tmp, b) -> y
#        grad(x, y) -> w@GRAD
#        grad(x, y) -> b@GRAD
#      ]
#    }
# 
#    block[1] = PlaceableBlock {
#      parent_idx = 0
#      ops = [         
#        opt(b@GRAD, w@GRAD, lr) -> w, b
#      ]                                
#    }  
#  }  

通过从 Block 衍生出 Placeable Block, 当 Distribute Transpiler 进行 transpile 时, 会分析的 Block 与 Block 之间的依赖关系, 并为 Placeable Block 依赖的 Variable 创建 send_op, 为 Placeable Block 写出的 Variable 创建 recv_op;

依赖分析

对于上面的 ProgramDesc, 我们可以得到如下的依赖分析结果

image

Block ID Input Variable Output Variable
block 0 w,b w@GRAD, b@GRAD
block 1 w@GRAD, b@GRAD w, b

所以 Block 1 会等待 Block 0 将 w@GRAD 或 b@GRAD 写入后, 再执行对应的 op, 而 Block 0 也会等待 Block 1 将 w 和 b 写入再开始下一轮的训练;

这样, 我们就支持了将一个 Block 完整的分配到另一个 Executor 上执行;

DistributedBlock Design

当然, 仅支持将 Block 分配到另一个 Executor 对于分布式程序来说还是不够的, 我们还需要支持对 Block 进行复制, 复制多份后, 将 Block 分配到不同的 Executor 上并发执行, 所以我们需要从 PlaceableBlock 衍生出 DistributedBlock;

op 并发

op 并发就是将一个 Placeable Block 中的 op 拆分成多个, 并分配到多个 Distributed Block 中, 例如将上例中的 block 1 进行 op 并发复制, 则得到这样两个 Program Desc:

#  message ProgramDesc {
#    block[2] = PlaceableBlock {
#      parent_idx = -1
#      ops = [         
#        opt(w@GRAD, lr) -> w
#      ]                                
#    }  
#  }  
#  message ProgramDesc {
#    block[3] = PlaceableBlock {
#      parent_idx = -1
#      ops = [         
#        opt(b@GRAD, lr) -> b
#      ]                                
#    }  
#  }  

对应的, 依赖图发生变化:

Block ID Input Variable Output Variable
block 0 w,b w@GRAD, b@GRAD
block 2 w@GRAD w
block 3 b@GRAD b

我们依旧可以将 block 0 分配给 Executor 0, block 2 分配给 Executor 1, block 3 分配给 Executor 2 执行;

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions