Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7995380
Merge branch 'feature/refine_gather_reduce' of https://github.com/che…
chengduoZH Apr 18, 2018
a6715d6
clean code
chengduoZH Apr 18, 2018
ac7b414
fix VisitVariable
chengduoZH Apr 18, 2018
9c8b9c0
Merge branch 'feature/add_reduce_op_handle' of https://github.com/che…
chengduoZH Apr 19, 2018
bd11523
insert reduce op handle to parallel_exe
chengduoZH Apr 19, 2018
9f45c06
fix API
chengduoZH Apr 19, 2018
cecebbe
debug
chengduoZH Apr 19, 2018
01b0106
merge feature/add_reduce_op_handle_Debug
chengduoZH Apr 19, 2018
dee077b
unit test can work
chengduoZH Apr 19, 2018
c741aeb
clean code
chengduoZH Apr 19, 2018
fca0fb9
debug
chengduoZH Apr 20, 2018
0561fae
debug
chengduoZH Apr 20, 2018
a22d385
Polish MultiDevicesBuilder
reyoung Apr 20, 2018
ed13f4a
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH Apr 20, 2018
e0f37f8
fix reduce op, broadcast op and gather op
chengduoZH Apr 20, 2018
8d83914
fix the threads per dev
chengduoZH Apr 20, 2018
cec94e1
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH Apr 21, 2018
3210055
code refine
chengduoZH Apr 22, 2018
20ba594
add nccl broadcast
chengduoZH Apr 22, 2018
bbad887
clean code
chengduoZH Apr 23, 2018
f965e9a
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH Apr 24, 2018
7b58d47
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH Apr 24, 2018
7ee07df
follow comments
chengduoZH Apr 24, 2018
ea78be2
follow comments
chengduoZH Apr 24, 2018
ed052f1
Merge develop branch and fix CPPLint issues
chengduoZH May 2, 2018
c0a3746
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chengduoZH May 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions paddle/cuda/include/hl_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#ifndef HL_BASE_H_
#define HL_BASE_H_
#pragma once

#include <cstddef>

Expand Down Expand Up @@ -207,8 +206,8 @@ typedef struct {

#ifdef __NVCC__

#include "cuda_runtime.h"
#include "hl_cuda.h"
#include <cuda_runtime.h>
#include "paddle/cuda/include/hl_cuda.h"
#include "paddle/utils/Logging.h"

extern __thread bool g_sync_flag;
Expand Down Expand Up @@ -243,6 +242,4 @@ __shfl_sync(unsigned, T val, int src_line, int width) {
mask = __ballot_sync(FULL_WARP_MASK, (predicate))
#endif

#endif /* __NVCC__ */

#endif /* HL_BASE_H_ */
#endif // __NVCC__
4 changes: 3 additions & 1 deletion paddle/fluid/framework/details/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ if(WITH_GPU)
dynload_cuda)
set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle)
nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim dynload_cuda)
nv_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor dynload_cuda)

else()
set(multi_devices_graph_builder_deps)
cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope ddim)
cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
endif()

cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)

cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle
Expand Down
108 changes: 87 additions & 21 deletions paddle/fluid/framework/details/broadcast_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
namespace paddle {
namespace framework {
namespace details {
BroadcastOpHandle::BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}

void BroadcastOpHandle::RunImpl() {
if (places_.size() == 1) return;
// the input and output may have dummy var.
VarHandle *in_var_handle;

Expand Down Expand Up @@ -55,27 +53,95 @@ void BroadcastOpHandle::RunImpl() {

Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);

for (auto *out : out_var_handles) {
if (*out == *in_var_handle) {
continue;
if (platform::is_cpu_place(in_tensor.place())) {
for (auto *out : out_var_handles) {
if (*out == *in_var_handle) {
continue;
}

auto &out_p = out->place_;
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out_p.which(), in_tensor.place().which(),
"Places must be all on CPU or all on CUDA.");

VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(out_p,
in_tensor.type());

auto dev_ctx = dev_ctxes_.at(out_p);
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
paddle::framework::TensorCopy(
in_tensor, out_p, *dev_ctx,
&VariableVisitor::GetMutableTensor(out_var));
});
}
} else {
#ifdef PADDLE_WITH_CUDA
PADDLE_ENFORCE(platform::is_gpu_place(in_tensor.place()));
VarHandle *out_handle;
int root = boost::get<platform::CUDAPlace>(in_tensor.place()).device;
std::vector<std::function<void()>> broadcast_calls;

for (size_t j = 0; j < out_var_handles.size(); ++j) {
VarHandle *out_var_handle = out_var_handles[j];
Variable *out_var = var_scopes.at(out_var_handle->scope_idx_)
->FindVar(out_var_handle->name_);

if (*out_var_handle != *in_var_handle) {
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out_var_handle->place_.which(),
in_tensor.place().which(),
"Places must be all on CPU or all on CUDA.");
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(
out_var_handle->place_, in_tensor.type());
}

auto out_p = out_var_handle->place_;
int dev_id = boost::get<platform::CUDAPlace>(out_p).device;

auto &nccl_ctx = nccl_ctxs_->at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;

void *send_recv_buffer = nullptr;
if (root == dev_id) {
send_recv_buffer = const_cast<void *>(in_tensor.data<void>());
out_handle = out_var_handle;
} else {
send_recv_buffer =
VariableVisitor::GetMutableTensor(out_var).mutable_data(
out_var_handle->place_);
}

int type = platform::ToNCCLDataType(in_tensor.type());
broadcast_calls.emplace_back([=] {
PADDLE_ENFORCE(platform::dynload::ncclBcast(
send_recv_buffer, in_tensor.numel(),
static_cast<ncclDataType_t>(type), root, comm, stream));
});
}

auto &out_p = out->place_;
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
PADDLE_ENFORCE_EQ(out_p.which(), in_var_handle->place_.which(),
"Places must be all on CPU or all on CUDA.");

VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
VariableVisitor::GetMutableTensor(out_var).mutable_data(out_p,
in_tensor.type());

auto dev_ctx = dev_ctxes_.at(out_p);
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
paddle::framework::TensorCopy(
in_tensor, out_p, *(dev_ctx),
&VariableVisitor::GetMutableTensor(out_var));
this->RunAndRecordEvent([&] {
{
platform::NCCLGroupGuard guard;
for (auto &call : broadcast_calls) {
call();
}
}
if (*out_handle != *in_var_handle) {
auto out_var = var_scopes.at(in_var_handle->scope_idx_)
->FindVar(out_var_handles[0]->name_);
paddle::framework::TensorCopy(
in_tensor, in_var_handle->place_,
*(dev_ctxes_.at(in_var_handle->place_)),
&VariableVisitor::GetMutableTensor(out_var));
}
});
#else
PADDLE_THROW("CUDA is not support.");
#endif
}
}

Expand Down
23 changes: 22 additions & 1 deletion paddle/fluid/framework/details/broadcast_op_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,32 @@
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/platform/device_context.h"

#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/nccl_helper.h"
#endif

namespace paddle {
namespace framework {
namespace details {

struct BroadcastOpHandle : public OpHandleBase {
public:
#ifdef PADDLE_WITH_CUDA
BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::NCCLContextMap *nccl_ctxs)
: local_scopes_(local_scopes), places_(places), nccl_ctxs_(nccl_ctxs) {
if (nccl_ctxs_) {
for (auto &p_ctx : nccl_ctxs_->contexts_) {
dev_ctxes_[platform::CUDAPlace(p_ctx.first)] = p_ctx.second.ctx_.get();
}
}
}
#else
BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places);
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}
#endif

std::string Name() const override;

Expand All @@ -44,6 +62,9 @@ struct BroadcastOpHandle : public OpHandleBase {
private:
const std::vector<Scope *> &local_scopes_;
const std::vector<platform::Place> &places_;
#ifdef PADDLE_WITH_CUDA
const platform::NCCLContextMap *nccl_ctxs_;
#endif
};
} // namespace details
} // namespace framework
Expand Down
36 changes: 33 additions & 3 deletions paddle/fluid/framework/details/broadcast_op_handle_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,25 @@ struct TestBroadcastOpHandle {
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
std::vector<p::Place> gpu_list_;
bool use_gpu_;
#ifdef PADDLE_WITH_CUDA
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
#endif

void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
}
#ifdef PADDLE_WITH_CUDA
if (nccl_ctxs_) {
nccl_ctxs_->WaitAll();
}
#endif
}

void InitCtxOnGpu(bool use_gpu) {
if (use_gpu) {
use_gpu_ = use_gpu;
if (use_gpu_) {
#ifdef PADDLE_WITH_CUDA
int count = p::GetCUDADeviceCount();
if (count <= 1) {
Expand All @@ -57,6 +67,7 @@ struct TestBroadcastOpHandle {
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CUDADeviceContext(p));
}
nccl_ctxs_.reset(new platform::NCCLContextMap(gpu_list_));
#else
PADDLE_THROW("CUDA is not support.");
#endif
Expand All @@ -67,6 +78,9 @@ struct TestBroadcastOpHandle {
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CPUDeviceContext(p));
}
#ifdef PADDLE_WITH_CUDA
nccl_ctxs_.reset(nullptr);
#endif
}
}

Expand All @@ -82,7 +96,21 @@ struct TestBroadcastOpHandle {
}
param_scopes_[input_scope_idx]->Var("input");

op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_));
if (use_gpu_) {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(
new BroadcastOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get()));
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(
new BroadcastOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get()));
#else
op_handle_.reset(new BroadcastOpHandle(local_scopes_, gpu_list_));
#endif
}

auto* in_var_handle =
new VarHandle(1, input_scope_idx, "input", gpu_list_[input_scope_idx]);
Expand All @@ -97,7 +125,9 @@ struct TestBroadcastOpHandle {
op_handle_->AddInput(dummy_var_handle);

for (size_t j = 0; j < gpu_list_.size(); ++j) {
op_handle_->SetDeviceContext(gpu_list_[j], ctxs_[j].get());
if (!use_gpu_) {
op_handle_->SetDeviceContext(gpu_list_[j], ctxs_[j].get());
}
VarHandle* out_var_handle = new VarHandle(2, j, "out", gpu_list_[j]);
vars_.emplace_back(out_var_handle);
op_handle_->AddOutput(out_var_handle);
Expand Down
53 changes: 26 additions & 27 deletions paddle/fluid/framework/details/gather_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ GatherOpHandle::GatherOpHandle(const std::vector<Scope *> &local_scopes,
: local_scopes_(local_scopes), places_(places) {}

void GatherOpHandle::RunImpl() {
if (places_.size() == 1) return;
// the input and output may have dummy var.
auto in_var_handles = DynamicCast<VarHandle>(inputs_);

Expand Down Expand Up @@ -53,55 +54,53 @@ void GatherOpHandle::RunImpl() {
PADDLE_ENFORCE(pre_in_var->IsType<framework::SelectedRows>(),
"Currently, gather_op only can gather SelectedRows.");

auto pre_place = in_0_handle->place_;
PADDLE_ENFORCE_EQ(out_var_handle->place_.which(), pre_place.which(),
"The place of input and output should be the same.");

// Wait input done, this Wait is asynchronous operation
WaitInputVarGenerated(in_var_handles);

std::vector<int64_t> out_rows;
std::vector<Tensor> in_tensors;
std::vector<platform::Place> in_places;

auto &pre_in = pre_in_var->Get<framework::SelectedRows>();
auto &pre_in_value = pre_in_var->Get<framework::SelectedRows>();
// gather the inputs
for (auto *in_handle : in_var_handles) {
auto in_p = in_handle->place_;
in_places.push_back(in_p);
PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(),
"Places must be all on CPU or all on CUDA.");
auto *in_var =
var_scopes.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
auto &in_sr = in_var->Get<framework::SelectedRows>();
PADDLE_ENFORCE_NOT_NULL(in_var);

auto &in_sr_value = in_var->Get<framework::SelectedRows>();

PADDLE_ENFORCE_EQ(in_sr.value().type(), pre_in.value().type(),
PADDLE_ENFORCE_EQ(in_sr_value.place().which(), pre_in_value.place().which(),
"Places must be all on CPU or all on GPU.");
PADDLE_ENFORCE_EQ(in_sr_value.value().type(), pre_in_value.value().type(),
"The type of input is not consistent.");
PADDLE_ENFORCE_EQ(pre_in.height(), in_sr.height(),
PADDLE_ENFORCE_EQ(in_sr_value.height(), pre_in_value.height(),
"The height of inputs is not consistent.");
PADDLE_ENFORCE_EQ(pre_in.GetCompleteDims(), in_sr.GetCompleteDims(),
PADDLE_ENFORCE_EQ(in_sr_value.GetCompleteDims(),
pre_in_value.GetCompleteDims(),
"The dims of inputs is not consistent.");

auto &in_sr_rows = in_sr.rows();
auto &in_sr_rows = in_sr_value.rows();
out_rows.insert(out_rows.end(), in_sr_rows.begin(), in_sr_rows.end());

in_tensors.emplace_back(in_sr.value());
in_tensors.emplace_back(in_sr_value.value());
}

// write the output
auto &out_place = out_var_handle->place_;
auto out_scope_idx = out_var_handle->scope_idx_;
auto out_var = var_scopes.at(out_scope_idx)->FindVar(out_var_handle->name_);

auto out = out_var->GetMutable<framework::SelectedRows>();
out->set_height(pre_in.height());
out->set_rows(out_rows);
PADDLE_ENFORCE_EQ(out_place.which(), pre_in_value.place().which(),
"Places must be all on CPU or all on GPU.");
auto out_var =
var_scopes.at(out_var_handle->scope_idx_)->FindVar(out_var_handle->name_);
PADDLE_ENFORCE_NOT_NULL(out_var);
auto out_value = out_var->GetMutable<framework::SelectedRows>();
out_value->set_height(pre_in_value.height());
out_value->set_rows(out_rows);
size_t rows = out_rows.size();
DDim out_dim = pre_in.GetCompleteDims();
DDim out_dim = pre_in_value.GetCompleteDims();
out_dim[0] = static_cast<int64_t>(rows);
out->mutable_value()->Resize(out_dim);
out->mutable_value()->mutable_data(out_place, pre_in.value().type());
Tensor *out_tensor = out->mutable_value();
out_value->mutable_value()->Resize(out_dim);
out_value->mutable_value()->mutable_data(out_place,
pre_in_value.value().type());
Tensor *out_tensor = out_value->mutable_value();

// copy
auto dev_ctx = dev_ctxes_[out_place];
Expand Down
Loading