Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions paddle/fluid/framework/details/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,28 @@ cc_library(send_op_handle SRCS send_op_handle.cc DEPS framework_proto scope plac
cc_library(ssa_graph SRCS ssa_graph.cc DEPS var_handle op_handle_base)
cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS ssa_graph)

cc_library(variable_visitor SRCS variable_visitor.cc DEPS lod_tensor selected_rows)

if(WITH_GPU)
nv_library(nccl_all_reduce_op_handle SRCS nccl_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
dynload_cuda)
set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle)
nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base scope ddim dynload_cuda)
nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim dynload_cuda)
else()
set(multi_devices_graph_builder_deps)
cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base scope ddim)
cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim)
endif()

cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)

cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle
scale_loss_grad_op_handle send_op_handle ${multi_devices_graph_builder_deps})
scale_loss_grad_op_handle send_op_handle ${multi_devices_graph_builder_deps} reduce_op_handle broadcast_op_handle)

cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ssa_graph framework_proto)
cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope
simple_threadpool device_context)

cc_library(variable_visitor SRCS variable_visitor.cc DEPS lod_tensor selected_rows)

cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base variable_visitor scope ddim memory)
cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope variable_visitor ddim memory)

cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context broadcast_op_handle)
cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
Expand Down
21 changes: 13 additions & 8 deletions paddle/fluid/framework/details/broadcast_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,15 @@ void BroadcastOpHandle::RunImpl() {
// &in_place;
WaitInputVarGenerated(*in_var_handle);

auto *in_var = local_scopes_.at(in_var_handle->scope_idx_)
->FindVar(in_var_handle->name_);
std::vector<const Scope *> var_scopes;
for (auto *s : local_scopes_) {
var_scopes.emplace_back(s->FindVar(kLocalExecScopeName)->Get<Scope *>());
}

auto *in_var =
var_scopes.at(in_var_handle->scope_idx_)->FindVar(in_var_handle->name_);
PADDLE_ENFORCE_NOT_NULL(in_var);

Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);

for (auto *out : out_var_handles) {
Expand All @@ -55,17 +61,16 @@ void BroadcastOpHandle::RunImpl() {
}

auto &out_p = out->place_;
auto *out_var = local_scopes_.at(out->scope_idx_)->FindVar(out->name_);

auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out_p.which(), in_var_handle->place_.which(),
"Places must be all on CPU or all on CUDA.");

VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var)
.Resize(in_tensor.dims())
.mutable_data(out_p, in_tensor.type());
VariableVisitor::GetMutableTensor(out_var).mutable_data(out_p,
in_tensor.type());

auto dev_ctx = dev_ctxes_[out_p];
auto dev_ctx = dev_ctxes_.at(out_p);
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
paddle::framework::TensorCopy(
in_tensor, out_p, *(dev_ctx),
Expand Down
24 changes: 18 additions & 6 deletions paddle/fluid/framework/details/broadcast_op_handle_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const f::DDim kDims = {20, 20};
struct TestBroadcastOpHandle {
std::vector<std::unique_ptr<p::DeviceContext>> ctxs_;
std::vector<Scope*> local_scopes_;
std::vector<Scope*> param_scopes_;
Scope g_scope_;
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
Expand Down Expand Up @@ -72,11 +73,17 @@ struct TestBroadcastOpHandle {
void InitBroadcastOp(size_t input_scope_idx) {
for (size_t j = 0; j < gpu_list_.size(); ++j) {
local_scopes_.push_back(&(g_scope_.NewScope()));
local_scopes_[j]->Var("out");
Scope& local_scope = local_scopes_.back()->NewScope();
*local_scopes_.back()
->Var(details::kLocalExecScopeName)
->GetMutable<Scope*>() = &local_scope;
local_scope.Var("out");
param_scopes_.emplace_back(&local_scope);
}
local_scopes_[input_scope_idx]->Var("input");
param_scopes_[input_scope_idx]->Var("input");

op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_));

auto* in_var_handle =
new VarHandle(1, input_scope_idx, "input", gpu_list_[input_scope_idx]);
vars_.emplace_back(in_var_handle);
Expand Down Expand Up @@ -105,7 +112,8 @@ struct TestBroadcastOpHandle {
}

void TestBroadcastLodTensor(size_t input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_var = param_scopes_[input_scope_idx]->FindVar("input");
PADDLE_ENFORCE_NOT_NULL(in_var);
auto in_lod_tensor = in_var->GetMutable<f::LoDTensor>();
in_lod_tensor->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);

Expand All @@ -117,14 +125,16 @@ struct TestBroadcastOpHandle {
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), in_lod_tensor);
in_lod_tensor->set_lod(lod);
in_lod_tensor->Resize(kDims);

op_handle_->Run(false);

WaitAll();

p::CPUPlace cpu_place;
for (size_t j = 0; j < gpu_list_.size(); ++j) {
auto out_var = local_scopes_[j]->Var("out");
auto out_var = param_scopes_[j]->FindVar("out");
PADDLE_ENFORCE_NOT_NULL(out_var);
auto out_tensor = out_var->Get<f::LoDTensor>();
PADDLE_ENFORCE_EQ(out_tensor.lod(), lod, "lod is not equal.");

Expand All @@ -139,7 +149,8 @@ struct TestBroadcastOpHandle {
}

void TestBroadcastSelectedRows(size_t input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_var = param_scopes_[input_scope_idx]->FindVar("input");
PADDLE_ENFORCE_NOT_NULL(in_var);
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();
auto value = in_selected_rows->mutable_value();
value->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
Expand All @@ -162,7 +173,8 @@ struct TestBroadcastOpHandle {

p::CPUPlace cpu_place;
for (size_t j = 0; j < gpu_list_.size(); ++j) {
auto out_var = local_scopes_[j]->Var("out");
auto out_var = param_scopes_[j]->FindVar("out");
PADDLE_ENFORCE_NOT_NULL(out_var);
auto& out_select_rows = out_var->Get<f::SelectedRows>();
auto rt = out_select_rows.value();

Expand Down
15 changes: 10 additions & 5 deletions paddle/fluid/framework/details/gather_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,19 @@ void GatherOpHandle::RunImpl() {
out_var_handle = out_var_handles.front();
}

std::vector<const Scope *> var_scopes;
for (auto *s : local_scopes_) {
var_scopes.emplace_back(s->FindVar(kLocalExecScopeName)->Get<Scope *>());
}

auto in_0_handle = in_var_handles[0];
auto pre_in_var =
local_scopes_[in_0_handle->scope_idx_]->FindVar(in_0_handle->name_);
auto pre_place = in_0_handle->place_;

var_scopes.at(in_0_handle->scope_idx_)->FindVar(in_0_handle->name_);
PADDLE_ENFORCE_NOT_NULL(pre_in_var);
PADDLE_ENFORCE(pre_in_var->IsType<framework::SelectedRows>(),
"Currently, gather_op only can gather SelectedRows.");

auto pre_place = in_0_handle->place_;
PADDLE_ENFORCE_EQ(out_var_handle->place_.which(), pre_place.which(),
"The place of input and output should be the same.");

Expand All @@ -67,7 +72,7 @@ void GatherOpHandle::RunImpl() {
PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(),
"Places must be all on CPU or all on CUDA.");
auto *in_var =
local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
var_scopes.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
auto &in_sr = in_var->Get<framework::SelectedRows>();

PADDLE_ENFORCE_EQ(in_sr.value().type(), pre_in.value().type(),
Expand All @@ -86,7 +91,7 @@ void GatherOpHandle::RunImpl() {
// write the output
auto &out_place = out_var_handle->place_;
auto out_scope_idx = out_var_handle->scope_idx_;
auto out_var = local_scopes_[out_scope_idx]->FindVar(out_var_handle->name_);
auto out_var = var_scopes.at(out_scope_idx)->FindVar(out_var_handle->name_);

auto out = out_var->GetMutable<framework::SelectedRows>();
out->set_height(pre_in.height());
Expand Down
21 changes: 15 additions & 6 deletions paddle/fluid/framework/details/gather_op_handle_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const f::DDim kDims = {20, 20};
struct TestGatherOpHandle {
std::vector<std::unique_ptr<p::DeviceContext>> ctxs_;
std::vector<Scope*> local_scopes_;
std::vector<Scope*> param_scopes_;
Scope g_scope_;
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
Expand Down Expand Up @@ -71,9 +72,14 @@ struct TestGatherOpHandle {
void InitGatherOp(size_t input_scope_idx) {
for (size_t j = 0; j < gpu_list_.size(); ++j) {
local_scopes_.push_back(&(g_scope_.NewScope()));
local_scopes_[j]->Var("out");
Scope& local_scope = local_scopes_.back()->NewScope();
*local_scopes_.back()
->Var(details::kLocalExecScopeName)
->GetMutable<Scope*>() = &local_scope;
local_scope.Var("input");
param_scopes_.emplace_back(&local_scope);
}
local_scopes_[input_scope_idx]->Var("input");
param_scopes_[input_scope_idx]->Var("out");

op_handle_.reset(new GatherOpHandle(local_scopes_, gpu_list_));
// add input
Expand Down Expand Up @@ -115,7 +121,8 @@ struct TestGatherOpHandle {

for (size_t input_scope_idx = 0; input_scope_idx < gpu_list_.size();
++input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_var = param_scopes_.at(input_scope_idx)->FindVar("input");
PADDLE_ENFORCE_NOT_NULL(in_var);
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();
auto value = in_selected_rows->mutable_value();
value->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
Expand All @@ -128,10 +135,11 @@ struct TestGatherOpHandle {
value->Resize(kDims);
}

auto out_var = local_scopes_[output_scope_idx]->Var("out");
auto out_var = param_scopes_.at(output_scope_idx)->FindVar("out");
PADDLE_ENFORCE_NOT_NULL(out_var);
auto out_selected_rows = out_var->GetMutable<f::SelectedRows>();

auto in_var = local_scopes_[output_scope_idx]->Var("input");
auto in_var = param_scopes_.at(output_scope_idx)->FindVar("input");
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();

out_selected_rows->mutable_value()->ShareDataWith(
Expand All @@ -155,7 +163,8 @@ struct TestGatherOpHandle {
f::TensorCopy(rt, cpu_place, *(ctxs_[output_scope_idx]), &result_tensor);
float* ct = result_tensor.data<float>();

for (int64_t j = 0; j < f::product(kDims); ++j) {
for (int64_t j = 0;
j < f::product(kDims) * static_cast<int64_t>(gpu_list_.size()); ++j) {
ASSERT_NEAR(ct[j], send_vector[j % send_vector.size()], 1e-5);
}
}
Expand Down
10 changes: 5 additions & 5 deletions paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@ void NCCLAllReduceOpHandle::RunImpl() {
int dtype = -1;
size_t numel = 0;

std::vector<LoDTensor> lod_tensors;
std::vector<const LoDTensor *> lod_tensors;

for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto *s = local_scopes_[i];
auto &local_scope = *s->FindVar(kLocalExecScopeName)->Get<Scope *>();

auto &lod_tensor = local_scope.FindVar(var_name)->Get<LoDTensor>();
lod_tensors.emplace_back(lod_tensor);
lod_tensors.emplace_back(&lod_tensor);
}

if (platform::is_gpu_place(lod_tensors[0].place())) {
if (platform::is_gpu_place(lod_tensors[0]->place())) {
std::vector<std::function<void()>> all_reduce_calls;
for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto &p = places_[i];
auto &lod_tensor = lod_tensors[i];
auto &lod_tensor = *lod_tensors[i];
void *buffer = const_cast<void *>(lod_tensor.data<void>());

if (dtype == -1) {
Expand Down Expand Up @@ -93,7 +93,7 @@ void NCCLAllReduceOpHandle::RunImpl() {

// Reduce All Tensor to trg in CPU
ReduceLoDTensor func(lod_tensors, &trg);
VisitDataType(ToDataType(lod_tensors[0].type()), func);
VisitDataType(ToDataType(lod_tensors[0]->type()), func);

for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto &scope =
Expand Down
8 changes: 4 additions & 4 deletions paddle/fluid/framework/details/reduce_and_gather.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ namespace framework {
namespace details {

struct ReduceLoDTensor {
const std::vector<LoDTensor> &src_tensors_;
const std::vector<const LoDTensor *> &src_tensors_;
LoDTensor &dst_tensor_;

ReduceLoDTensor(const std::vector<LoDTensor> &src, LoDTensor *dst)
ReduceLoDTensor(const std::vector<const LoDTensor *> &src, LoDTensor *dst)
: src_tensors_(src), dst_tensor_(*dst) {}

template <typename T>
void operator()() const {
PADDLE_ENFORCE(!src_tensors_.empty());
auto &t0 = src_tensors_[0];
auto &t0 = *src_tensors_[0];
PADDLE_ENFORCE_NE(t0.numel(), 0);
dst_tensor_.Resize(t0.dims());
T *dst = dst_tensor_.mutable_data<T>(platform::CPUPlace());
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst);

for (size_t i = 1; i < src_tensors_.size(); ++i) {
auto &t = src_tensors_[i];
auto &t = *src_tensors_[i];
PADDLE_ENFORCE_EQ(t.dims(), t0.dims());
PADDLE_ENFORCE_EQ(t.type(), t0.type());
std::transform(t.data<T>(), t.data<T>() + t.numel(), dst, dst,
Expand Down
Loading