@@ -113,23 +113,21 @@ void AddOp(const std::string &type, const f::VariableNameMap &inputs,
113113 op->SetAttrMap (attrs);
114114}
115115
116- void StartServerNet (bool is_sparse) {
116+ void StartServerNet (bool is_sparse, std::atomic< bool > *initialized ) {
117117 f::Scope scope;
118118 p::CPUPlace place;
119119 if (is_sparse) {
120120 InitSelectedRowsInScope (place, &scope);
121121 } else {
122122 InitTensorsInScope (place, &scope);
123123 }
124-
125124 // sub program run in listen_and_serv_op, for simple test we use sum
126125 f::ProgramDesc program;
127126 const auto &root_block = program.Block (0 );
128127 auto *optimize_block = program.AppendBlock (root_block);
129128 auto *prefetch_block = program.AppendBlock (root_block);
130129 // X for server side tensors, RX for received tensors, must be of same shape.
131130 AddOp (" sum" , {{" X" , {" x0" , " x1" }}}, {{" Out" , {" Out" }}}, {}, optimize_block);
132-
133131 f::AttributeMap attrs;
134132 attrs.insert ({" endpoint" , std::string (" 127.0.0.1:0" )});
135133 attrs.insert ({" Fanin" , 1 });
@@ -141,12 +139,16 @@ void StartServerNet(bool is_sparse) {
141139 attrs.insert ({" sync_mode" , true });
142140 listen_and_serv_op =
143141 f::OpRegistry::CreateOp (" listen_and_serv" , {{" X" , {" x1" }}}, {}, attrs);
142+ *initialized = true ;
144143 listen_and_serv_op->Run (scope, place);
145144 LOG (INFO) << " server exit" ;
146145}
147146
148147TEST (SendRecvOp, CPUDense) {
149- std::thread server_thread (StartServerNet, false );
148+ std::atomic<bool > initialized{false };
149+ std::thread server_thread (StartServerNet, false , &initialized);
150+ while (!initialized) {
151+ }
150152 sleep (5 ); // wait server to start
151153 // local net
152154 f::Scope scope;
@@ -156,9 +158,11 @@ TEST(SendRecvOp, CPUDense) {
156158 scope.Var (" RPC_CLIENT_VAR" );
157159
158160 f::AttributeMap attrs;
159- selected_port = static_cast <paddle::operators::ListenAndServOp *>(
160- listen_and_serv_op.get ())
161- ->GetSelectedPort ();
161+ auto *listen_and_serv_op_ptr =
162+ static_cast <paddle::operators::ListenAndServOp *>(
163+ listen_and_serv_op.get ());
164+ ASSERT_TRUE (listen_and_serv_op_ptr != nullptr );
165+ selected_port = listen_and_serv_op_ptr->GetSelectedPort ();
162166 std::string endpoint = paddle::string::Sprintf (" 127.0.0.1:%d" , selected_port);
163167 attrs.insert ({" endpoints" , std::vector<std::string>({endpoint})});
164168 attrs.insert ({" epmap" , std::vector<std::string>({endpoint})});
@@ -184,18 +188,24 @@ TEST(SendRecvOp, CPUDense) {
184188}
185189
186190TEST (SendRecvOp, CPUSparse) {
187- std::thread server_thread (StartServerNet, true );
188- sleep (3 ); // wait server to start
191+ std::atomic<bool > initialized;
192+ initialized = false ;
193+ std::thread server_thread (StartServerNet, true , &initialized);
194+ while (!initialized) {
195+ }
196+ sleep (5 ); // wait server to start
189197 // local net
190198 f::Scope scope;
191199 p::CPUPlace place;
192200 p::CPUDeviceContext ctx (place);
193201 InitSelectedRowsInScope (place, &scope);
194202 scope.Var (" RPC_CLIENT_VAR" );
195203 f::AttributeMap attrs;
196- selected_port = static_cast <paddle::operators::ListenAndServOp *>(
197- listen_and_serv_op.get ())
198- ->GetSelectedPort ();
204+ auto *listen_and_serv_op_ptr =
205+ static_cast <paddle::operators::ListenAndServOp *>(
206+ listen_and_serv_op.get ());
207+ ASSERT_TRUE (listen_and_serv_op_ptr != nullptr );
208+ selected_port = listen_and_serv_op_ptr->GetSelectedPort ();
199209 std::string endpoint = paddle::string::Sprintf (" 127.0.0.1:%d" , selected_port);
200210 attrs.insert ({" endpoints" , std::vector<std::string>({endpoint})});
201211 attrs.insert ({" epmap" , std::vector<std::string>({endpoint})});
0 commit comments