Skip to content

Commit 83e746b

Browse files
amoxicfeng-yu1
authored andcommitted
Handle response stream bind failures gracefully
1 parent c16fdc6 commit 83e746b

File tree

5 files changed

+164
-26
lines changed

5 files changed

+164
-26
lines changed

src/brpc/controller.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1459,7 +1459,15 @@ void Controller::HandleStreamConnection(Socket *host_socket) {
14591459
if(!ptrs[i]) continue;
14601460
Stream* extra_stream = (Stream *) ptrs[i]->conn();
14611461
_remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]);
1462-
s->SetHostSocket(host_socket);
1462+
if (s->SetHostSocket(host_socket) != 0) {
1463+
const int ec = errno;
1464+
Stream::SetFailed(_request_streams, ec,
1465+
"Fail to bind response stream to %s",
1466+
host_socket->description().c_str());
1467+
SetFailed(ec, "Fail to bind response stream to %s",
1468+
host_socket->description().c_str());
1469+
return;
1470+
}
14631471
extra_stream->SetConnected(_remote_stream_settings);
14641472
}
14651473
}

src/brpc/policy/baidu_rpc_protocol.cpp

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,19 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
360360
Stream* s = (Stream *) stream_ptr->conn();
361361
StreamSettings *stream_settings = meta.mutable_stream_settings();
362362
s->FillSettings(stream_settings);
363-
s->SetHostSocket(sock);
363+
if (s->SetHostSocket(sock) != 0) {
364+
const int errcode = errno;
365+
LOG_IF(WARNING, errcode != EPIPE)
366+
<< "Fail to bind response stream=" << response_stream_id
367+
<< " to " << sock->description() << ": "
368+
<< berror(errcode);
369+
cntl->SetFailed(errcode, "Fail to bind response stream to %s",
370+
sock->description().c_str());
371+
Stream::SetFailed(response_stream_ids, errcode,
372+
"Fail to bind response stream to %s",
373+
sock->description().c_str());
374+
return;
375+
}
364376
for (size_t i = 1; i < response_stream_ids.size(); ++i) {
365377
stream_settings->mutable_extra_stream_ids()->Add(response_stream_ids[i]);
366378
}
@@ -390,6 +402,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
390402

391403
ResponseWriteInfo args;
392404
bthread_id_t response_id = INVALID_BTHREAD_ID;
405+
auto response_write_guard = butil::MakeScopeGuard([&response_id, &args, span] {
406+
if (response_id == INVALID_BTHREAD_ID) {
407+
return;
408+
}
409+
bthread_id_join(response_id);
410+
// Do not care about the result of background writing.
411+
// TODO: this is not sent
412+
span->set_sent_us(args.sent_us);
413+
});
393414
if (span) {
394415
span->set_response_size(res_buf.size());
395416
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
@@ -426,7 +447,21 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
426447
SocketUniquePtr extra_stream_ptr;
427448
if (Socket::Address(extra_stream_id, &extra_stream_ptr) == 0) {
428449
Stream* extra_stream = (Stream *) extra_stream_ptr->conn();
429-
extra_stream->SetHostSocket(sock);
450+
if (extra_stream->SetHostSocket(sock) != 0) {
451+
const int errcode = errno;
452+
LOG_IF(WARNING, errcode != EPIPE)
453+
<< "Fail to bind response stream=" << extra_stream_id
454+
<< " to " << sock->description() << ": "
455+
<< berror(errcode);
456+
cntl->SetFailed(errcode, "Fail to bind response stream to %s",
457+
sock->description().c_str());
458+
StreamIds remaining_stream_ids(response_stream_ids.begin() + i,
459+
response_stream_ids.end());
460+
Stream::SetFailed(remaining_stream_ids, errcode,
461+
"Fail to bind response stream to %s",
462+
sock->description().c_str());
463+
return;
464+
}
430465
extra_stream->SetConnected();
431466
} else {
432467
LOG(WARNING) << "Stream=" << extra_stream_id
@@ -451,12 +486,6 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
451486
}
452487
}
453488

454-
if (span) {
455-
bthread_id_join(response_id);
456-
// Do not care about the result of background writing.
457-
// TODO: this is not sent
458-
span->set_sent_us(args.sent_us);
459-
}
460489
}
461490

462491
namespace {

src/brpc/stream.cpp

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "brpc/stream.h"
2020

2121
#include <gflags/gflags.h>
22+
#include "butil/string_printf.h"
2223
#include "butil/time.h"
2324
#include "butil/object_pool.h"
2425
#include "butil/unique_ptr.h"
@@ -57,6 +58,7 @@ Stream::Stream()
5758
, _pending_buf(NULL)
5859
, _start_idle_timer_us(0)
5960
, _idle_timer(0)
61+
, _set_host_socket_ec(0)
6062
{
6163
_connect_meta.on_connect = NULL;
6264
CHECK_EQ(0, bthread_mutex_init(&_connect_mutex, NULL));
@@ -665,13 +667,16 @@ int Stream::SetHostSocket(Socket *host_socket) {
665667
std::call_once(_set_host_socket_flag, [this, host_socket]() {
666668
SocketUniquePtr ptr;
667669
host_socket->ReAddress(&ptr);
668-
// TODO add *this to host socke
669670
if (ptr->AddStream(id()) != 0) {
670-
CHECK(false) << id() << " fail to add stream to host socket";
671+
_set_host_socket_ec = errno ? errno : ptr->non_zero_error_code();
671672
return;
672673
}
673674
_host_socket = ptr.release();
674675
});
676+
if (_host_socket == NULL) {
677+
errno = _set_host_socket_ec ? _set_host_socket_ec : EFAILEDSOCKET;
678+
return -1;
679+
}
675680
return 0;
676681
}
677682

@@ -731,27 +736,35 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) {
731736
return TriggerOnConnectIfNeed();
732737
}
733738

734-
int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) {
739+
int Stream::SetFailedWithReason(StreamId id, int error_code,
740+
const std::string& reason) {
735741
SocketUniquePtr ptr;
736742
if (Socket::AddressFailedAsWell(id, &ptr) == -1) {
737-
// Don't care recycled stream
738743
return 0;
739744
}
740745
Stream* s = (Stream*)ptr->conn();
746+
s->Close(error_code, "%s", reason.c_str());
747+
return 0;
748+
}
749+
750+
int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) {
741751
va_list ap;
742752
va_start(ap, reason_fmt);
743-
s->Close(error_code, reason_fmt, ap);
753+
std::string reason;
754+
butil::string_vprintf(&reason, reason_fmt, ap);
744755
va_end(ap);
745-
return 0;
756+
return SetFailedWithReason(id, error_code, reason);
746757
}
747758

748759
int Stream::SetFailed(const StreamIds& ids, int error_code, const char* reason_fmt, ...) {
749760
va_list ap;
750761
va_start(ap, reason_fmt);
751-
for(size_t i = 0; i< ids.size(); ++i) {
752-
Stream::SetFailed(ids[i], error_code, reason_fmt, ap);
753-
}
762+
std::string reason;
763+
butil::string_vprintf(&reason, reason_fmt, ap);
754764
va_end(ap);
765+
for (size_t i = 0; i < ids.size(); ++i) {
766+
Stream::SetFailedWithReason(ids[i], error_code, reason);
767+
}
755768
return 0;
756769
}
757770

src/brpc/stream_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define BRPC_STREAM_IMPL_H
2121

2222
#include <mutex>
23+
#include <string>
2324
#include "bthread/bthread.h"
2425
#include "bthread/execution_queue.h"
2526
#include "brpc/socket.h"
@@ -91,6 +92,8 @@ friend struct butil::DefaultDeleter<Stream>;
9192
static int TriggerOnWritable(bthread_id_t id, void *data, int error_code);
9293
static void *RunOnWritable(void* arg);
9394
static void* RunOnConnect(void* arg);
95+
static int SetFailedWithReason(StreamId id, int error_code,
96+
const std::string& reason);
9497

9598
struct ConnectMeta {
9699
int (*on_connect)(int, int, void*);
@@ -136,6 +139,7 @@ friend struct butil::DefaultDeleter<Stream>;
136139
int64_t _start_idle_timer_us;
137140
bthread_timer_t _idle_timer;
138141
std::once_flag _set_host_socket_flag;
142+
int _set_host_socket_ec;
139143
};
140144

141145
} // namespace brpc

test/brpc_streaming_rpc_unittest.cpp

Lines changed: 92 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121

2222
#include <gtest/gtest.h>
2323
#include <atomic>
24+
#include <errno.h>
2425
#include "brpc/server.h"
2526

2627
#include "brpc/controller.h"
2728
#include "brpc/channel.h"
2829
#include "brpc/callback.h"
2930
#include "brpc/socket.h"
31+
#include "brpc/details/controller_private_accessor.h"
3032
#include "brpc/stream_impl.h"
3133
#include "brpc/policy/streaming_rpc_protocol.h"
3234
#include "echo.pb.h"
@@ -38,12 +40,12 @@ class AfterAcceptStream {
3840

3941
class MyServiceWithStream : public test::EchoService {
4042
public:
41-
MyServiceWithStream(const brpc::StreamOptions& options)
43+
MyServiceWithStream(const brpc::StreamOptions& options)
4244
: _options(options)
4345
, _after_accept_stream(NULL)
4446
{}
4547
MyServiceWithStream(const brpc::StreamOptions& options,
46-
AfterAcceptStream* after_accept_stream)
48+
AfterAcceptStream* after_accept_stream)
4749
: _options(options)
4850
, _after_accept_stream(after_accept_stream)
4951
{}
@@ -53,9 +55,9 @@ class MyServiceWithStream : public test::EchoService {
5355
{}
5456

5557
void Echo(::google::protobuf::RpcController* controller,
56-
const ::test::EchoRequest* request,
57-
::test::EchoResponse* response,
58-
::google::protobuf::Closure* done) {
58+
const ::test::EchoRequest* request,
59+
::test::EchoResponse* response,
60+
::google::protobuf::Closure* done) {
5961
brpc::ClosureGuard done_guard(done);
6062
response->set_message(request->message());
6163
brpc::Controller* cntl = (brpc::Controller*)controller;
@@ -125,7 +127,8 @@ class BatchStreamClientHandler : public brpc::StreamInputHandler {
125127

126128
void on_closed(brpc::StreamId /*id*/) override {}
127129

128-
void on_failed(brpc::StreamId /*id*/, int /*error_code*/, const std::string& /*error_text*/) override {}
130+
void on_failed(brpc::StreamId /*id*/, int /*error_code*/,
131+
const std::string& /*error_text*/) override {}
129132

130133
private:
131134
BatchStreamFeedbackRaceState* _state;
@@ -162,7 +165,8 @@ static void* SendTwoMessagesOnServerExtraStream(void* arg) {
162165
std::string payload(64, 'a');
163166
butil::IOBuf out;
164167
out.append(payload);
165-
state->server_first_write_rc.store(brpc::StreamWrite(sid, out), std::memory_order_relaxed);
168+
state->server_first_write_rc.store(brpc::StreamWrite(sid, out),
169+
std::memory_order_relaxed);
166170
}
167171

168172
// 2) Then send another byte. This write should become writable only after
@@ -226,12 +230,59 @@ static void SetAtomicTrue(std::atomic<bool>* f) {
226230

227231
static bool WaitForTrue(const std::atomic<bool>& f, int timeout_ms) {
228232
const int64_t deadline_us = butil::gettimeofday_us() + (int64_t)timeout_ms * 1000L;
229-
while (!f.load(std::memory_order_acquire) && butil::gettimeofday_us() < deadline_us) {
233+
while (!f.load(std::memory_order_acquire) &&
234+
butil::gettimeofday_us() < deadline_us) {
230235
usleep(1000);
231236
}
232237
return f.load(std::memory_order_acquire);
233238
}
234239

240+
class MyServiceWithStreamAndFailedSocket : public test::EchoService {
241+
public:
242+
explicit MyServiceWithStreamAndFailedSocket(const brpc::StreamOptions& options)
243+
: _options(options) {}
244+
245+
void Echo(::google::protobuf::RpcController* controller,
246+
const ::test::EchoRequest* request,
247+
::test::EchoResponse* response,
248+
::google::protobuf::Closure* done) override {
249+
brpc::ClosureGuard done_guard(done);
250+
response->set_message(request->message());
251+
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
252+
brpc::StreamId response_stream;
253+
ASSERT_EQ(0, StreamAccept(&response_stream, *cntl, &_options));
254+
brpc::ControllerPrivateAccessor accessor(cntl);
255+
ASSERT_TRUE(accessor.get_sending_socket() != NULL);
256+
accessor.get_sending_socket()->SetFailed();
257+
}
258+
259+
private:
260+
brpc::StreamOptions _options;
261+
};
262+
263+
TEST_F(StreamingRpcTest, set_host_socket_returns_error_when_socket_is_failed) {
264+
brpc::SocketOptions socket_options;
265+
brpc::SocketId host_socket_id;
266+
ASSERT_EQ(0, brpc::Socket::Create(socket_options, &host_socket_id));
267+
brpc::SocketUniquePtr host_socket;
268+
ASSERT_EQ(0, brpc::Socket::Address(host_socket_id, &host_socket));
269+
ASSERT_EQ(0, host_socket->SetFailed());
270+
271+
brpc::StreamId stream_id;
272+
brpc::StreamOptions stream_options;
273+
ASSERT_EQ(0, brpc::Stream::Create(stream_options, NULL, &stream_id, false));
274+
brpc::ScopedStream stream_guard(stream_id);
275+
276+
brpc::SocketUniquePtr stream_socket;
277+
ASSERT_EQ(0, brpc::Socket::Address(stream_id, &stream_socket));
278+
brpc::Stream* stream = static_cast<brpc::Stream*>(stream_socket->conn());
279+
280+
errno = 0;
281+
ASSERT_EQ(-1, stream->SetHostSocket(host_socket.get()));
282+
ASSERT_NE(0, errno);
283+
ASSERT_TRUE(stream->_host_socket == NULL);
284+
}
285+
235286
TEST_F(StreamingRpcTest, sanity) {
236287
brpc::Server server;
237288
MyServiceWithStream service;
@@ -393,6 +444,39 @@ class OrderedInputHandler : public brpc::StreamInputHandler {
393444
HandlerControl* _cntl;
394445
};
395446

447+
TEST_F(StreamingRpcTest, server_failed_socket_before_response_closes_stream_without_abort) {
448+
OrderedInputHandler handler;
449+
brpc::StreamOptions response_stream_options;
450+
response_stream_options.handler = &handler;
451+
brpc::Server server;
452+
MyServiceWithStreamAndFailedSocket service(response_stream_options);
453+
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
454+
ASSERT_EQ(0, server.Start(9007, NULL));
455+
456+
brpc::Channel channel;
457+
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
458+
brpc::Controller cntl;
459+
brpc::StreamId request_stream;
460+
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, NULL));
461+
brpc::ScopedStream stream_guard(request_stream);
462+
463+
test::EchoService_Stub stub(&channel);
464+
stub.Echo(&cntl, &request, &response, NULL);
465+
ASSERT_TRUE(cntl.Failed());
466+
467+
for (int i = 0; i < 10000 && !handler.stopped(); ++i) {
468+
usleep(100);
469+
}
470+
471+
server.Stop(0);
472+
server.Join();
473+
474+
ASSERT_TRUE(handler.stopped());
475+
ASSERT_TRUE(handler.failed());
476+
ASSERT_EQ(0, handler.idle_times());
477+
ASSERT_EQ(0, handler._expected_next_value);
478+
}
479+
396480
TEST_F(StreamingRpcTest, received_in_order) {
397481
OrderedInputHandler handler;
398482
brpc::StreamOptions opt;

0 commit comments

Comments
 (0)