Skip to content

Commit 6a12915

Browse files
authored
Merge pull request #3235 from altman08/master
Fix attachment being overwritten when backuprequest is triggered.
2 parents 2e0f0b0 + d3f7960 commit 6a12915

File tree

2 files changed

+79
-7
lines changed

2 files changed

+79
-7
lines changed

src/brpc/selective_channel.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -364,19 +364,20 @@ void SubDone::Run() {
364364
<< _cid.value << ": " << berror(rc);
365365
return;
366366
}
367-
// NOTE: Copying gettable-but-settable fields which are generally set
368-
// during the RPC to reflect details.
369-
main_cntl->_remote_side = _cntl._remote_side;
370-
// connection_type may be changed during CallMethod.
371-
main_cntl->set_connection_type(_cntl.connection_type());
372-
main_cntl->response_attachment().swap(_cntl.response_attachment());
373367
Resource r;
374368
r.response = _cntl._response;
375369
r.sub_done = this;
376370
if (!_owner->PushFree(r)) {
377371
return;
378372
}
379373
const int saved_error = main_cntl->ErrorCode();
374+
375+
// NOTE: Copying gettable-but-settable fields which are generally set
376+
// during the RPC to reflect details.
377+
main_cntl->_remote_side = _cntl._remote_side;
378+
// connection_type may be changed during CallMethod.
379+
main_cntl->set_connection_type(_cntl.connection_type());
380+
main_cntl->response_attachment().swap(_cntl.response_attachment());
380381

381382
if (_cntl.Failed()) {
382383
if (_cntl.ErrorCode() == ENODATA || _cntl.ErrorCode() == EHOSTDOWN) {

test/brpc_channel_unittest.cpp

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ class MyEchoService : public ::test::EchoService {
176176
res->add_code_list(req->code());
177177
}
178178
res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());
179+
if (mockfunc_) mockfunc_(cntl_base, req, res, done);
179180

180181
brpc::ProtocolType protocol = cntl->request_protocol();
181182
if ((brpc::PROTOCOL_HTTP == protocol || brpc::PROTOCOL_H2 == protocol) &&
@@ -198,6 +199,17 @@ class MyEchoService : public ::test::EchoService {
198199
EXPECT_TRUE(nullptr != request);
199200
EXPECT_TRUE(nullptr != response);
200201
}
202+
203+
public:
204+
using MockFuncType = void(google::protobuf::RpcController*,
205+
const ::test::EchoRequest*, ::test::EchoResponse*,
206+
google::protobuf::Closure*);
207+
void SetMockFunc(std::function<MockFuncType>&& mockfunc) {
208+
mockfunc_ = std::move(mockfunc);
209+
}
210+
211+
private:
212+
std::function<MockFuncType> mockfunc_;
201213
};
202214

203215
pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT;
@@ -1408,7 +1420,7 @@ class ChannelTest : public ::testing::Test{
14081420
SetUpChannel(subchan, single_server, short_connection);
14091421
ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
14101422
}
1411-
1423+
14121424
brpc::Controller cntl;
14131425
test::EchoRequest req;
14141426
test::EchoResponse res;
@@ -1427,6 +1439,55 @@ class ChannelTest : public ::testing::Test{
14271439
EXPECT_EQ(17, cntl.sub(0)->_real_timeout_ms);
14281440
StopAndJoin();
14291441
}
1442+
1443+
void TestBackupRequestSelective(
1444+
bool single_server, bool async, bool short_connection) {
1445+
std::cout << " *** single=" << single_server
1446+
<< " async=" << async
1447+
<< " short=" << short_connection << std::endl;
1448+
ASSERT_EQ(0, StartAccept(_ep));
1449+
1450+
const size_t NCHANS = 8;
1451+
brpc::SelectiveChannel channel;
1452+
ASSERT_EQ(0, channel.Init("rr", NULL));
1453+
for (size_t i = 0; i < NCHANS; ++i) {
1454+
brpc::Channel* subchan = new brpc::Channel;
1455+
SetUpChannel(subchan, single_server, short_connection);
1456+
ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
1457+
}
1458+
1459+
brpc::Controller cntl;
1460+
test::EchoRequest req;
1461+
test::EchoResponse res;
1462+
req.set_message(__FUNCTION__);
1463+
cntl.set_backup_request_ms(20);
1464+
cntl.set_timeout_ms(100);
1465+
std::atomic<int> call_cnt(0);
1466+
_svc.SetMockFunc([&call_cnt](google::protobuf::RpcController* cntl_base,
1467+
const ::test::EchoRequest*,
1468+
::test::EchoResponse*,
1469+
google::protobuf::Closure*) {
1470+
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
1471+
int see_cnt = call_cnt.fetch_add(1, std::memory_order_relaxed);
1472+
if (see_cnt == 0) {
1473+
LOG(INFO) << "slow node";
1474+
bthread_usleep(30 * 1000);
1475+
} else {
1476+
LOG(INFO) << "normal node ";
1477+
butil::IOBuf iobuf;
1478+
iobuf.append("123");
1479+
cntl->response_attachment().swap(iobuf);
1480+
}
1481+
});
1482+
butil::Timer tm;
1483+
tm.start();
1484+
CallMethod(&channel, &cntl, &req, &res, async);
1485+
tm.stop();
1486+
EXPECT_FALSE(cntl.Failed());
1487+
EXPECT_EQ(call_cnt.load(std::memory_order_relaxed), 2);
1488+
EXPECT_EQ(cntl.response_attachment().to_string(), "123");
1489+
StopAndJoin();
1490+
}
14301491

14311492
void TestCloseFD(bool single_server, bool async, bool short_connection) {
14321493
std::cout << " *** single=" << single_server
@@ -2713,6 +2774,16 @@ TEST_F(ChannelTest, timeout_selective) {
27132774
}
27142775
}
27152776

2777+
TEST_F(ChannelTest, backuprequest_selective) {
2778+
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
2779+
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
2780+
for (int k = 0; k <=1; ++k) { // Flag ShortConnection
2781+
TestBackupRequestSelective(i, j, k);
2782+
}
2783+
}
2784+
}
2785+
}
2786+
27162787
TEST_F(ChannelTest, close_fd) {
27172788
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
27182789
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous

0 commit comments

Comments
 (0)