Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions paddle/fluid/operators/send_recv_op_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,21 @@ void AddOp(const std::string &type, const f::VariableNameMap &inputs,
op->SetAttrMap(attrs);
}

void StartServerNet(bool is_sparse) {
void StartServerNet(bool is_sparse, std::atomic<bool> *initialized) {
f::Scope scope;
p::CPUPlace place;
if (is_sparse) {
InitSelectedRowsInScope(place, &scope);
} else {
InitTensorsInScope(place, &scope);
}

// sub program run in listen_and_serv_op, for simple test we use sum
f::ProgramDesc program;
const auto &root_block = program.Block(0);
auto *optimize_block = program.AppendBlock(root_block);
auto *prefetch_block = program.AppendBlock(root_block);
// X for server side tensors, RX for received tensors, must be of same shape.
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block);

f::AttributeMap attrs;
attrs.insert({"endpoint", std::string("127.0.0.1:0")});
attrs.insert({"Fanin", 1});
Expand All @@ -141,12 +139,16 @@ void StartServerNet(bool is_sparse) {
attrs.insert({"sync_mode", true});
listen_and_serv_op =
f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs);
*initialized = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we should let the listen_and_serv op notify outside world that it's ready, I'm doing this in #10292

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

listen_and_serv_op->Run(scope, place);
LOG(INFO) << "server exit";
}

TEST(SendRecvOp, CPUDense) {
std::thread server_thread(StartServerNet, false);
std::atomic<bool> initialized{false};
std::thread server_thread(StartServerNet, false, &initialized);
while (!initialized) {
}
sleep(5); // wait server to start
// local net
f::Scope scope;
Expand All @@ -156,9 +158,11 @@ TEST(SendRecvOp, CPUDense) {
scope.Var("RPC_CLIENT_VAR");

f::AttributeMap attrs;
selected_port = static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get())
->GetSelectedPort();
auto *listen_and_serv_op_ptr =
static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get());
ASSERT_TRUE(listen_and_serv_op_ptr != nullptr);
selected_port = listen_and_serv_op_ptr->GetSelectedPort();
std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
attrs.insert({"endpoints", std::vector<std::string>({endpoint})});
attrs.insert({"epmap", std::vector<std::string>({endpoint})});
Expand All @@ -184,18 +188,24 @@ TEST(SendRecvOp, CPUDense) {
}

TEST(SendRecvOp, CPUSparse) {
std::thread server_thread(StartServerNet, true);
sleep(3); // wait server to start
std::atomic<bool> initialized;
initialized = false;
std::thread server_thread(StartServerNet, true, &initialized);
while (!initialized) {
}
sleep(5); // wait server to start
// local net
f::Scope scope;
p::CPUPlace place;
p::CPUDeviceContext ctx(place);
InitSelectedRowsInScope(place, &scope);
scope.Var("RPC_CLIENT_VAR");
f::AttributeMap attrs;
selected_port = static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get())
->GetSelectedPort();
auto *listen_and_serv_op_ptr =
static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get());
ASSERT_TRUE(listen_and_serv_op_ptr != nullptr);
selected_port = listen_and_serv_op_ptr->GetSelectedPort();
std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
attrs.insert({"endpoints", std::vector<std::string>({endpoint})});
attrs.insert({"epmap", std::vector<std::string>({endpoint})});
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/pybind/tensor_py.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ T TensorGetElement(const framework::Tensor &self, size_t offset) {
return self.data<T>()[offset];
} else {
std::shared_ptr<framework::Tensor> dst(new framework::Tensor);
framework::TensorCopy(self, platform::CPUPlace(), dst.get());
framework::TensorCopySync(self, platform::CPUPlace(), dst.get());
return dst->data<T>()[offset];
}
}
Expand All @@ -117,9 +117,9 @@ template <typename T>
void TensorSetElement(framework::Tensor *self, size_t offset, T elem) {
if (platform::is_gpu_place(self->place())) {
std::shared_ptr<framework::Tensor> dst(new framework::Tensor);
framework::TensorCopy(*self, platform::CPUPlace(), dst.get());
framework::TensorCopySync(*self, platform::CPUPlace(), dst.get());
dst->data<T>()[offset] = elem;
framework::TensorCopy(*dst.get(), self->place(), self);
framework::TensorCopySync(*dst.get(), self->place(), self);

} else if (platform::is_cpu_place(self->place())) {
self->data<T>()[offset] = elem;
Expand Down