Skip to content

Commit 5c444f9

Browse files
martin-suchaPeter Navrátil
and
Peter Navrátil
committed
Add rack-aware load balancing policy
We need to prefer local rack as there are higher network costs when communicating with nodes in remote rack. This policy prefers nodes from the local rack, then local datacenter and then other nodes. The new RackAwarePolicy is similar to DCAwarePolicy, but does not have the deprecated options. TokenAwarePolicy and other code needed to be modified so that the local rack is propagated. The TokenAware policy was changed to prefer replicas in remote rack / remote DC before trying non-replica nodes. It does not make much sense to not try the replicas and trying the replicas simplifies the code as now we have three levels local/remote/remote2. This change might not be backwards-compatible, we don't know what exactly this project guarantees in terms of backwards compatibility. Co-Authored-By: Peter Navrátil <[email protected]>
1 parent 21e8c0f commit 5c444f9

31 files changed

+1010
-88
lines changed

include/cassandra.h

+44
Original file line numberDiff line numberDiff line change
@@ -2213,6 +2213,50 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster,
22132213
unsigned used_hosts_per_remote_dc,
22142214
cass_bool_t allow_remote_dcs_for_local_cl);
22152215

2216+
/**
2217+
* Configures the cluster to use Rack-aware load balancing.
2218+
* For each query, all live nodes in a primary 'local' rack are tried first,
2219+
* followed by nodes from local DC and then nodes from other DCs.
2220+
*
2221+
* With empty local_rack and local_dc, default local_dc and local_rack
2222+
* is chosen from the first connected contact point,
2223+
* and no remote hosts are considered in query plans.
2224+
* If relying on this mechanism, be sure to use only contact
2225+
* points from the local rack.
2226+
*
2227+
* @public @memberof CassCluster
2228+
*
2229+
* @param[in] cluster
2230+
* @param[in] local_dc The primary data center to try first
2231+
* @param[in] local_rack The primary rack to try first
2232+
* @return CASS_OK if successful, otherwise an error occurred
2233+
*/
2234+
CASS_EXPORT CassError
2235+
cass_cluster_set_load_balance_rack_aware(CassCluster* cluster,
2236+
const char* local_dc,
2237+
const char* local_rack);
2238+
2239+
2240+
/**
2241+
* Same as cass_cluster_set_load_balance_rack_aware(), but with lengths for string
2242+
* parameters.
2243+
*
2244+
* @public @memberof CassCluster
2245+
*
2246+
* @param[in] cluster
2247+
* @param[in] local_dc
2248+
* @param[in] local_dc_length
2249+
* @return same as cass_cluster_set_load_balance_dc_aware()
2250+
*
2251+
* @see cass_cluster_set_load_balance_dc_aware()
2252+
*/
2253+
CASS_EXPORT CassError
2254+
cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster,
2255+
const char* local_dc,
2256+
size_t local_dc_length,
2257+
const char* local_rack,
2258+
size_t local_rack_length);
2259+
22162260
/**
22172261
* Configures the cluster to use token-aware request routing or not.
22182262
*

src/cluster.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include "constants.hpp"
2020
#include "dc_aware_policy.hpp"
21+
#include "rack_aware_policy.hpp"
2122
#include "external.hpp"
2223
#include "logger.hpp"
2324
#include "resolver.hpp"
@@ -240,6 +241,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list
240241
const ControlConnectionSchema& schema,
241242
const LoadBalancingPolicy::Ptr& load_balancing_policy,
242243
const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
244+
const String& local_rack,
243245
const StringMultimap& supported_options, const ClusterSettings& settings)
244246
: connection_(connection)
245247
, listener_(listener ? listener : &nop_cluster_listener__)
@@ -251,6 +253,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list
251253
, connected_host_(connected_host)
252254
, hosts_(hosts)
253255
, local_dc_(local_dc)
256+
, local_rack_(local_rack)
254257
, supported_options_(supported_options)
255258
, is_recording_events_(settings.disable_events_on_startup) {
256259
static const auto optimized_msg = "===== Using optimized driver!!! =====\n";

src/cluster.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ class Cluster
257257
* determining the next control connection host.
258258
* @param load_balancing_policies
259259
* @param local_dc The local datacenter determined by the metadata service for initializing the
260+
* @param local_rack The local rack determined by the metadata service for initializing the
260261
* load balancing policies.
261262
* @param supported_options Supported options discovered during control connection.
262263
* @param settings The control connection settings to use for reconnecting the
@@ -267,6 +268,7 @@ class Cluster
267268
const ControlConnectionSchema& schema,
268269
const LoadBalancingPolicy::Ptr& load_balancing_policy,
269270
const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
271+
const String& local_rack,
270272
const StringMultimap& supported_options, const ClusterSettings& settings);
271273

272274
/**
@@ -361,6 +363,7 @@ class Cluster
361363
const Host::Ptr& connected_host() const { return connected_host_; }
362364
const TokenMap::Ptr& token_map() const { return token_map_; }
363365
const String& local_dc() const { return local_dc_; }
366+
const String& local_rack() const { return local_rack_; }
364367
const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); }
365368
const StringMultimap& supported_options() const { return supported_options_; }
366369
const ShardPortCalculator* shard_port_calculator() const { return shard_port_calculator_.get(); }
@@ -449,6 +452,7 @@ class Cluster
449452
PreparedMetadata prepared_metadata_;
450453
TokenMap::Ptr token_map_;
451454
String local_dc_;
455+
String local_rack_;
452456
StringMultimap supported_options_;
453457
Timer timer_;
454458
bool is_recording_events_;

src/cluster_config.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,27 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, const c
301301
return CASS_OK;
302302
}
303303

304+
CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc,
305+
const char* local_rack) {
306+
if (local_dc == NULL || local_rack == NULL) {
307+
return CASS_ERROR_LIB_BAD_PARAMS;
308+
}
309+
return cass_cluster_set_load_balance_rack_aware_n(cluster, local_dc, SAFE_STRLEN(local_dc),
310+
local_rack, SAFE_STRLEN(local_rack));
311+
}
312+
313+
CassError cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const char* local_dc,
314+
size_t local_dc_length,
315+
const char* local_rack,
316+
size_t local_rack_length) {
317+
if (local_dc == NULL || local_dc_length == 0 || local_rack == NULL || local_rack_length == 0) {
318+
return CASS_ERROR_LIB_BAD_PARAMS;
319+
}
320+
cluster->config().set_load_balancing_policy(new RackAwarePolicy(
321+
String(local_dc, local_dc_length), String(local_rack, local_rack_length)));
322+
return CASS_OK;
323+
}
324+
304325
void cass_cluster_set_token_aware_routing(CassCluster* cluster, cass_bool_t enabled) {
305326
cluster->config().set_token_aware_routing(enabled == cass_true);
306327
}

src/cluster_connector.cpp

+9-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "cluster_connector.hpp"
1818
#include "dc_aware_policy.hpp"
19+
#include "rack_aware_policy.hpp"
1920
#include "protocol.hpp"
2021
#include "random.hpp"
2122
#include "round_robin_policy.hpp"
@@ -177,6 +178,7 @@ void ClusterConnector::on_resolve(ClusterMetadataResolver* resolver) {
177178
}
178179

179180
local_dc_ = resolver->local_dc();
181+
local_rack_ = resolver->local_rack();
180182
remaining_connector_count_ = resolved_contact_points.size();
181183
for (AddressVec::const_iterator it = resolved_contact_points.begin(),
182184
end = resolved_contact_points.end();
@@ -231,7 +233,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
231233
for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end();
232234
it != end; ++it) {
233235
LoadBalancingPolicy::Ptr policy(*it);
234-
policy->init(connected_host, hosts, random_, local_dc_);
236+
policy->init(connected_host, hosts, random_, local_dc_, local_rack_);
235237
policy->register_handles(event_loop_->loop());
236238
}
237239

@@ -248,6 +250,11 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
248250
message = "No hosts available for the control connection using the "
249251
"DC-aware load balancing policy. "
250252
"Check to see if the configured local datacenter is valid";
253+
} else if (dynamic_cast<RackAwarePolicy::RackAwareQueryPlan*>(query_plan.get()) !=
254+
NULL) { // Check if Rack-aware
255+
message = "No hosts available for the control connection using the "
256+
"Rack-aware load balancing policy. "
257+
"Check to see if the configured local datacenter and rack is valid";
251258
} else {
252259
message = "No hosts available for the control connection using the "
253260
"configured load balancing policy";
@@ -258,7 +265,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
258265

259266
cluster_.reset(new Cluster(connector->release_connection(), listener_, event_loop_,
260267
connected_host, hosts, connector->schema(), default_policy, policies,
261-
local_dc_, connector->supported_options(), settings_));
268+
local_dc_, local_rack_, connector->supported_options(), settings_));
262269

263270
// Clear any connection errors and set the final negotiated protocol version.
264271
error_code_ = CLUSTER_OK;

src/cluster_connector.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ class ClusterConnector : public RefCounted<ClusterConnector> {
169169
Random* random_;
170170
Metrics* metrics_;
171171
String local_dc_;
172+
String local_rack_;
172173
ClusterSettings settings_;
173174

174175
Callback callback_;

src/cluster_metadata_resolver.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ClusterMetadataResolver : public RefCounted<ClusterMetadataResolver> {
4848

4949
const AddressVec& resolved_contact_points() const { return resolved_contact_points_; }
5050
const String& local_dc() const { return local_dc_; }
51+
const String& local_rack() const { return local_rack_; }
5152

5253
protected:
5354
virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) = 0;
@@ -57,6 +58,7 @@ class ClusterMetadataResolver : public RefCounted<ClusterMetadataResolver> {
5758
protected:
5859
AddressVec resolved_contact_points_;
5960
String local_dc_;
61+
String local_rack_;
6062
Callback callback_;
6163
};
6264

src/dc_aware_policy.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ DCAwarePolicy::DCAwarePolicy(const String& local_dc, size_t used_hosts_per_remot
4343
DCAwarePolicy::~DCAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); }
4444

4545
void DCAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
46-
const String& local_dc) {
46+
const String& local_dc, const String& local_rack) {
4747
if (local_dc_.empty()) { // Only override if no local DC was specified.
4848
local_dc_ = local_dc;
4949
}

src/dc_aware_policy.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
3737
~DCAwarePolicy();
3838

3939
virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
40-
const String& local_dc);
40+
const String& local_dc, const String& local_rack);
4141

4242
virtual CassHostDistance distance(const Host::Ptr& host) const;
4343

src/execution_profile.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "cassandra.h"
2424
#include "constants.hpp"
2525
#include "dc_aware_policy.hpp"
26+
#include "rack_aware_policy.hpp"
2627
#include "dense_hash_map.hpp"
2728
#include "latency_aware_policy.hpp"
2829
#include "speculative_execution.hpp"

src/latency_aware_policy.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ using namespace datastax::internal;
2727
using namespace datastax::internal::core;
2828

2929
void LatencyAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
30-
const String& local_dc) {
30+
const String& local_dc, const String& local_rack) {
3131
hosts_->reserve(hosts.size());
3232
std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost());
3333
for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) {
3434
i->second->enable_latency_tracking(settings_.scale_ns, settings_.min_measured);
3535
}
36-
ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc);
36+
ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack);
3737
}
3838

3939
void LatencyAwarePolicy::register_handles(uv_loop_t* loop) { start_timer(loop); }

src/latency_aware_policy.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class LatencyAwarePolicy : public ChainedLoadBalancingPolicy {
5151
virtual ~LatencyAwarePolicy() {}
5252

5353
virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
54-
const String& local_dc);
54+
const String& local_dc, const String& local_rack);
5555

5656
virtual void register_handles(uv_loop_t* loop);
5757
virtual void close_handles();

src/list_policy.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ using namespace datastax::internal;
2323
using namespace datastax::internal::core;
2424

2525
void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
26-
const String& local_dc) {
26+
const String& local_dc, const String& local_rack) {
2727
HostMap valid_hosts;
2828
for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) {
2929
const Host::Ptr& host = i->second;
@@ -36,7 +36,7 @@ void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Ran
3636
LOG_ERROR("No valid hosts available for list policy");
3737
}
3838

39-
ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc);
39+
ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc, local_rack);
4040
}
4141

4242
CassHostDistance ListPolicy::distance(const Host::Ptr& host) const {

src/list_policy.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ListPolicy : public ChainedLoadBalancingPolicy {
3131
virtual ~ListPolicy() {}
3232

3333
virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
34-
const String& local_dc);
34+
const String& local_dc, const String& local_rack);
3535

3636
virtual CassHostDistance distance(const Host::Ptr& host) const;
3737

src/load_balancing.hpp

+15-3
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,21 @@ typedef enum CassBalancingState_ {
4040
CASS_BALANCING_NEW_QUERY_PLAN
4141
} CassBalancingState;
4242

43+
// CassHostDistance specifies how far a host is from the client.
44+
// The meaning of the distance depends on the load balancing policy.
45+
// The policies should assign the distance starting from the lowest
46+
// without skipping values, i.e. they should start with LOCAL.
47+
// For example:
48+
// - DCAwarePolicy uses LOCAL for same DC, REMOTE for different DC.
49+
// - RackAwarePolicy uses LOCAL for same rack,
50+
// REMOTE for different rack and same DC, and
51+
// REMOTE2 for different DC.
52+
// - RoundRobinPolicy has distinguishes only one distance level and
53+
// always uses LOCAL for all nodes.
4354
typedef enum CassHostDistance_ {
4455
CASS_HOST_DISTANCE_LOCAL,
4556
CASS_HOST_DISTANCE_REMOTE,
57+
CASS_HOST_DISTANCE_REMOTE2,
4658
CASS_HOST_DISTANCE_IGNORE
4759
} CassHostDistance;
4860

@@ -87,7 +99,7 @@ class LoadBalancingPolicy : public RefCounted<LoadBalancingPolicy> {
8799
virtual ~LoadBalancingPolicy() {}
88100

89101
virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
90-
const String& local_dc) = 0;
102+
const String& local_dc, const String &local_rack) = 0;
91103

92104
virtual void register_handles(uv_loop_t* loop) {}
93105
virtual void close_handles() {}
@@ -124,8 +136,8 @@ class ChainedLoadBalancingPolicy : public LoadBalancingPolicy {
124136
virtual ~ChainedLoadBalancingPolicy() {}
125137

126138
virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
127-
const String& local_dc) {
128-
return child_policy_->init(connected_host, hosts, random, local_dc);
139+
const String& local_dc, const String& local_rack) {
140+
return child_policy_->init(connected_host, hosts, random, local_dc, local_rack);
129141
}
130142

131143
virtual const LoadBalancingPolicy::Ptr& child_policy() const { return child_policy_; }

0 commit comments

Comments
 (0)