Skip to content

Commit 6986e41

Browse files
committed
Add The transport layer to support communication protocols of different device vendors.
1 parent 39a3436 commit 6986e41

26 files changed

+792
-344
lines changed

example/rdma_performance/client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class PerformanceTest {
102102

103103
int Init() {
104104
brpc::ChannelOptions options;
105-
options.use_rdma = FLAGS_use_rdma;
105+
options.socket_mode = FLAGS_use_rdma? RDMA : TCP;
106106
options.protocol = FLAGS_protocol;
107107
options.connection_type = FLAGS_connection_type;
108108
options.timeout_ms = FLAGS_rpc_timeout_ms;

example/rdma_performance/server.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ int main(int argc, char* argv[]) {
7676
g_last_time.store(0, butil::memory_order_relaxed);
7777

7878
brpc::ServerOptions options;
79-
options.use_rdma = FLAGS_use_rdma;
79+
options.socket_mode = FLAGS_use_rdma? RDMA : TCP;
8080
if (server.Start(FLAGS_port, &options) != 0) {
8181
LOG(ERROR) << "Fail to start EchoServer";
8282
return -1;

src/brpc/acceptor.cpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "butil/time.h" // gettimeofday_us
2424
#include "brpc/rdma/rdma_endpoint.h"
2525
#include "brpc/acceptor.h"
26+
#include "brpc/transport_factory.h"
2627

2728

2829
namespace brpc {
@@ -40,7 +41,7 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
4041
, _empty_cond(&_map_mutex)
4142
, _force_ssl(false)
4243
, _ssl_ctx(NULL)
43-
, _use_rdma(false)
44+
, socket_mode(TCP)
4445
, _bthread_tag(BTHREAD_TAG_DEFAULT) {
4546
}
4647

@@ -282,18 +283,10 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
282283
options.fd = in_fd;
283284
butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
284285
options.user = acception->user();
286+
options.need_on_edge_trigger = true;
285287
options.force_ssl = am->_force_ssl;
286288
options.initial_ssl_ctx = am->_ssl_ctx;
287-
#if BRPC_WITH_RDMA
288-
if (am->_use_rdma) {
289-
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
290-
} else {
291-
#else
292-
{
293-
#endif
294-
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
295-
}
296-
options.use_rdma = am->_use_rdma;
289+
options.socket_mode = am->socket_mode;
297290
options.bthread_tag = am->_bthread_tag;
298291
if (Socket::Create(options, &socket_id) != 0) {
299292
LOG(ERROR) << "Fail to create Socket";

src/brpc/acceptor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "butil/synchronization/condition_variable.h"
2323
#include "butil/containers/flat_map.h"
2424
#include "brpc/input_messenger.h"
25+
#include "brpc/common.h"
2526

2627

2728
namespace brpc {
@@ -110,8 +111,8 @@ friend class Server;
110111
bool _force_ssl;
111112
std::shared_ptr<SocketSSLContext> _ssl_ctx;
112113

113-
// Whether to use rdma or not
114-
bool _use_rdma;
114+
// Choose to use a certain socket: 0 TCP, 1 RDMA
115+
Mode socket_mode;
115116

116117
// Acceptor belongs to this tag
117118
bthread_tag_t _bthread_tag;

src/brpc/channel.cpp

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
3838
#include "brpc/rdma/rdma_helper.h"
3939
#include "brpc/policy/esp_authenticator.h"
40+
#include "brpc/transport_factory.h"
4041

4142
namespace brpc {
4243

@@ -60,7 +61,7 @@ ChannelOptions::ChannelOptions()
6061
, connection_type(CONNECTION_TYPE_UNKNOWN)
6162
, succeed_without_server(true)
6263
, log_succeed_without_server(true)
63-
, use_rdma(false)
64+
, socket_mode(TCP)
6465
, auth(NULL)
6566
, backup_request_policy(NULL)
6667
, retry_policy(NULL)
@@ -120,7 +121,7 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
120121
} else {
121122
// All disabled ChannelSSLOptions are the same
122123
}
123-
if (opt.use_rdma) {
124+
if (opt.socket_mode == RDMA) {
124125
buf.append("|rdma");
125126
}
126127
butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size());
@@ -163,20 +164,6 @@ Channel::~Channel() {
163164
}
164165
}
165166

166-
#if BRPC_WITH_RDMA
167-
static bool OptionsAvailableForRdma(const ChannelOptions* opt) {
168-
if (opt->has_ssl_options()) {
169-
LOG(WARNING) << "Cannot use SSL and RDMA at the same time";
170-
return false;
171-
}
172-
if (!rdma::SupportedByRdma(opt->protocol.name())) {
173-
LOG(WARNING) << "Cannot use " << opt->protocol.name()
174-
<< " over RDMA";
175-
return false;
176-
}
177-
return true;
178-
}
179-
#endif
180167

181168
int Channel::InitChannelOptions(const ChannelOptions* options) {
182169
if (options) { // Override default options if user provided one.
@@ -191,20 +178,8 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
191178
_options.hc_option.health_check_path = FLAGS_health_check_path;
192179
_options.hc_option.health_check_timeout_ms = FLAGS_health_check_timeout_ms;
193180
}
194-
if (_options.use_rdma) {
195-
#if BRPC_WITH_RDMA
196-
if (!OptionsAvailableForRdma(&_options)) {
197-
return -1;
198-
}
199-
rdma::GlobalRdmaInitializeOrDie();
200-
if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
201-
return -1;
202-
}
203-
#else
204-
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
205-
return -1;
206-
#endif
207-
}
181+
auto ret = TransportFactory::ContextInitOrDie(options->socket_mode, false, &_options);
182+
CHECK(ret == 0);
208183

209184
_serialize_request = protocol->serialize_request;
210185
_pack_request = protocol->pack_request;
@@ -369,7 +344,7 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
369344
return -1;
370345
}
371346
if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
372-
&_server_id, ssl_ctx, _options.use_rdma, _options.hc_option) != 0) {
347+
&_server_id, ssl_ctx, _options.socket_mode, _options.hc_option) != 0) {
373348
LOG(ERROR) << "Fail to insert into SocketMap";
374349
return -1;
375350
}
@@ -406,7 +381,7 @@ int Channel::Init(const char* ns_url,
406381
GetNamingServiceThreadOptions ns_opt;
407382
ns_opt.succeed_without_server = _options.succeed_without_server;
408383
ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
409-
ns_opt.use_rdma = _options.use_rdma;
384+
ns_opt.socket_mode = _options.socket_mode;
410385
ns_opt.channel_signature = ComputeChannelSignature(_options);
411386
ns_opt.hc_option = _options.hc_option;
412387
if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) {

src/brpc/channel.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "brpc/backup_request_policy.h"
3838
#include "brpc/naming_service_filter.h"
3939
#include "brpc/health_check_option.h"
40+
#include "brpc/common.h"
4041

4142
namespace brpc {
4243

@@ -105,9 +106,9 @@ struct ChannelOptions {
105106
const ChannelSSLOptions& ssl_options() const { return *_ssl_options; }
106107
ChannelSSLOptions* mutable_ssl_options();
107108

108-
// Let this channel use rdma rather than tcp.
109-
// Default: false
110-
bool use_rdma;
109+
// Let this channel Choose to use a certain socket: 0 TCP, 1 RDMA.
110+
// Default: TCP
111+
Mode socket_mode;
111112

112113
// Turn on authentication for this channel if `auth' is not NULL.
113114
// Note `auth' will not be deleted by channel and must remain valid when

src/brpc/common.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#ifndef BRPC_COMMON_H
19+
#define BRPC_COMMON_H
20+
enum Mode {
21+
TCP = 0,
22+
RDMA = 1
23+
};
24+
#endif //BRPC_COMMON_H

src/brpc/details/naming_service_thread.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ void NamingServiceThread::Actions::ResetServers(
126126
// to pick those Sockets with the right settings during OnAddedServers
127127
const SocketMapKey key(_added[i], _owner->_options.channel_signature);
128128
CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx,
129-
_owner->_options.use_rdma, _owner->_options.hc_option));
129+
_owner->_options.socket_mode, _owner->_options.hc_option));
130130
_added_sockets.push_back(tagged_id);
131131
}
132132

src/brpc/details/naming_service_thread.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "brpc/naming_service.h" // NamingService
2828
#include "brpc/naming_service_filter.h" // NamingServiceFilter
2929
#include "brpc/socket_map.h"
30+
#include "brpc/common.h"
3031

3132
namespace brpc {
3233

@@ -45,11 +46,11 @@ struct GetNamingServiceThreadOptions {
4546
GetNamingServiceThreadOptions()
4647
: succeed_without_server(false)
4748
, log_succeed_without_server(true)
48-
, use_rdma(false) {}
49-
49+
, socket_mode(TCP) {}
50+
5051
bool succeed_without_server;
5152
bool log_succeed_without_server;
52-
bool use_rdma;
53+
Mode socket_mode;
5354
HealthCheckOption hc_option;
5455
ChannelSignature channel_signature;
5556
std::shared_ptr<SocketSSLContext> ssl_ctx;

src/brpc/input_message_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class InputMessageBase : public Destroyable {
5555
friend class InputMessenger;
5656
friend void* ProcessInputMessage(void*);
5757
friend class Stream;
58+
friend class Transport;
5859
int64_t _received_us;
5960
int64_t _base_real_us;
6061
SocketUniquePtr _socket;

0 commit comments

Comments
 (0)