From 808c54d2602b29d027f31c9bd163eebde078a741 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Fri, 23 Dec 2022 10:16:55 +0100 Subject: [PATCH 1/5] Display the query plans in case of failed test The out was not helpful for understanding the failure of a test. Printing the query plans will help. --- tests/src/unit/tests/test_load_balancing.cpp | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/src/unit/tests/test_load_balancing.cpp b/tests/src/unit/tests/test_load_balancing.cpp index 0cb10da7f..adc6756cd 100644 --- a/tests/src/unit/tests/test_load_balancing.cpp +++ b/tests/src/unit/tests/test_load_balancing.cpp @@ -72,12 +72,21 @@ void populate_hosts(size_t count, const String& rack, const String& dc, HostMap* } void verify_sequence(QueryPlan* qp, const Vector& sequence) { + Vector
actual; Address received; + size_t limit = 100; + while (qp->compute_next(&received) && limit > 0) { + limit--; + actual.emplace_back(received); + } + if (limit == 0) { + FAIL() << "Iteration limit exceeded"; + } + Vector
expected; for (Vector::const_iterator it = sequence.begin(); it != sequence.end(); ++it) { - ASSERT_TRUE(qp->compute_next(&received)); - EXPECT_EQ(addr_for_sequence(*it), received); + expected.emplace_back(addr_for_sequence(*it)); } - EXPECT_FALSE(qp->compute_next(&received)); + EXPECT_EQ(expected, actual); } typedef Map QueryCounts; From dda8babfac265a23ad85ac59842b44b103f1f8e7 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Mon, 2 Jan 2023 10:29:48 +0100 Subject: [PATCH 2/5] Use count argument in run_policy This argument was not used anywhere, it worked by chance since it had the same value everywhere. --- tests/src/unit/tests/test_load_balancing.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/unit/tests/test_load_balancing.cpp b/tests/src/unit/tests/test_load_balancing.cpp index adc6756cd..5ebc3fac0 100644 --- a/tests/src/unit/tests/test_load_balancing.cpp +++ b/tests/src/unit/tests/test_load_balancing.cpp @@ -93,7 +93,7 @@ typedef Map QueryCounts; QueryCounts run_policy(LoadBalancingPolicy& policy, int count) { QueryCounts counts; - for (int i = 0; i < 12; ++i) { + for (int i = 0; i < count; ++i) { ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); Host::Ptr host(qp->compute_next()); if (host) { From c095807f9cc138940b8bd7de0e5ec48e7d08e76c Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Mon, 2 Jan 2023 10:51:32 +0100 Subject: [PATCH 3/5] Explicitly specify consistency level in run_policy The Datstax C++ driver has default ONE, the Scylla fork has LOCAL_ONE. --- tests/src/unit/tests/test_load_balancing.cpp | 32 +++++++++++--------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/tests/src/unit/tests/test_load_balancing.cpp b/tests/src/unit/tests/test_load_balancing.cpp index 5ebc3fac0..87e205eb2 100644 --- a/tests/src/unit/tests/test_load_balancing.cpp +++ b/tests/src/unit/tests/test_load_balancing.cpp @@ -91,10 +91,14 @@ void verify_sequence(QueryPlan* qp, const Vector& sequence) { typedef Map QueryCounts; -QueryCounts run_policy(LoadBalancingPolicy& policy, int count) { +QueryCounts run_policy(LoadBalancingPolicy& policy, int count, CassConsistency consistency) { QueryCounts counts; for (int i = 0; i < count; ++i) { - ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + QueryRequest::Ptr request(new QueryRequest("", 0)); + request->set_consistency(consistency); + SharedRefPtr request_handler( + new RequestHandler(request, ResponseFuture::Ptr())); + ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); Host::Ptr host(qp->compute_next()); if (host) { counts[host->address()] += 1; @@ -309,7 +313,7 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) { policy.init(SharedRefPtr(), hosts, NULL, ""); { // All nodes - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); } @@ -317,7 +321,7 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) { policy.on_host_down(hosts.begin()->first); { // One node down - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); } @@ -325,7 +329,7 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) { policy.on_host_up(hosts.begin()->second); { // All nodes again - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); } @@ -333,7 +337,7 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) { policy.on_host_removed(hosts.begin()->second); { // One node removed - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); } @@ -559,7 +563,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { policy.init(hosts.begin()->second, hosts, NULL, ""); { // All local nodes - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, LOCAL_DC); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); @@ -568,7 +572,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { policy.on_host_down(hosts.begin()->first); { // One local node down - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, LOCAL_DC); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); @@ -577,7 +581,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { policy.on_host_up(hosts.begin()->second); { // All local nodes again - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, LOCAL_DC); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); @@ -586,7 +590,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { policy.on_host_removed(hosts.begin()->second); { // One local node removed - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, LOCAL_DC); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); @@ -612,7 +616,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { } { // All remote nodes - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, REMOTE_DC); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); @@ -621,7 +625,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { policy.on_host_down(remote_dc_node1->address()); { // One remote node down - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, REMOTE_DC); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); @@ -630,7 +634,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { policy.on_host_up(remote_dc_node1); { // All remote nodes again - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, REMOTE_DC); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); @@ -639,7 +643,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { policy.on_host_removed(remote_dc_node1); { // One remote node removed - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, REMOTE_DC); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); From 21e8c0f6dd55f49ee4938fb23d59baab39e21aac Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Mon, 2 Jan 2023 10:30:11 +0100 Subject: [PATCH 4/5] Add doc comments to helper functions --- tests/src/unit/tests/test_load_balancing.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/src/unit/tests/test_load_balancing.cpp b/tests/src/unit/tests/test_load_balancing.cpp index 87e205eb2..a5316ce80 100644 --- a/tests/src/unit/tests/test_load_balancing.cpp +++ b/tests/src/unit/tests/test_load_balancing.cpp @@ -107,6 +107,7 @@ QueryCounts run_policy(LoadBalancingPolicy& policy, int count, CassConsistency c return counts; } +// verify_dcs checks that all hosts in counts are from expected_dc. void verify_dcs(const QueryCounts& counts, const HostMap& hosts, const String& expected_dc) { for (QueryCounts::const_iterator it = counts.begin(), end = counts.end(); it != end; ++it) { HostMap::const_iterator host_it = hosts.find(it->first); @@ -115,6 +116,7 @@ void verify_dcs(const QueryCounts& counts, const HostMap& hosts, const String& e } } +// verify_query_counts checks that each host was used expected_count times. void verify_query_counts(const QueryCounts& counts, int expected_count) { for (QueryCounts::const_iterator it = counts.begin(), end = counts.end(); it != end; ++it) { EXPECT_EQ(expected_count, it->second); From 9691ec0c983666ab3754c3e20c9f1ed9b3a25432 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Tue, 20 Dec 2022 19:12:02 +0100 Subject: [PATCH 5/5] Add rack-aware load balancing policy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We need to prefer local rack as there are higher network costs when communicating with nodes in remote rack. This policy prefers nodes from the local rack, then local datacenter and then other nodes. The new RackAwarePolicy is similar to DCAwarePolicy, but does not have the deprecated options. TokenAwarePolicy and other code needed to be modified so that the local rack is propagated. The TokenAware policy was changed to prefer replicas in remote rack / remote DC before trying non-replica nodes. It does not make much sense to not try the replicas and trying the replicas simplifies the code as now we have three levels local/remote/remote2. This change might not be backwards-compatible, we don't know what exactly this project guarantees in terms of backwards compatibility. Co-Authored-By: Peter Navrátil --- include/cassandra.h | 44 ++ src/cluster.cpp | 3 + src/cluster.hpp | 4 + src/cluster_config.cpp | 21 + src/cluster_connector.cpp | 11 +- src/cluster_connector.hpp | 1 + src/cluster_metadata_resolver.hpp | 2 + src/dc_aware_policy.cpp | 2 +- src/dc_aware_policy.hpp | 2 +- src/execution_profile.hpp | 1 + src/latency_aware_policy.cpp | 4 +- src/latency_aware_policy.hpp | 2 +- src/list_policy.cpp | 4 +- src/list_policy.hpp | 2 +- src/load_balancing.hpp | 18 +- src/rack_aware_policy.cpp | 310 +++++++++++++ src/rack_aware_policy.hpp | 128 ++++++ src/request_processor.cpp | 4 +- src/request_processor.hpp | 3 +- src/request_processor_initializer.cpp | 5 +- src/request_processor_initializer.hpp | 4 +- src/round_robin_policy.cpp | 2 +- src/round_robin_policy.hpp | 2 +- src/session.cpp | 9 +- src/session.hpp | 2 +- src/session_base.cpp | 4 +- src/session_base.hpp | 2 +- src/token_aware_policy.cpp | 28 +- src/token_aware_policy.hpp | 10 +- tests/src/unit/tests/test_load_balancing.cpp | 435 ++++++++++++++++-- .../src/unit/tests/test_request_processor.cpp | 32 +- 31 files changed, 1013 insertions(+), 88 deletions(-) create mode 100644 src/rack_aware_policy.cpp create mode 100644 src/rack_aware_policy.hpp diff --git a/include/cassandra.h b/include/cassandra.h index 49666cf20..664a11d56 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -2213,6 +2213,50 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, unsigned used_hosts_per_remote_dc, cass_bool_t allow_remote_dcs_for_local_cl); +/** + * Configures the cluster to use Rack-aware load balancing. + * For each query, all live nodes in a primary 'local' rack are tried first, + * followed by nodes from local DC and then nodes from other DCs. + * + * With empty local_rack and local_dc, default local_dc and local_rack + * is chosen from the first connected contact point, + * and no remote hosts are considered in query plans. + * If relying on this mechanism, be sure to use only contact + * points from the local rack. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] local_dc The primary data center to try first + * @param[in] local_rack The primary rack to try first + * @return CASS_OK if successful, otherwise an error occurred + */ +CASS_EXPORT CassError +cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, + const char* local_dc, + const char* local_rack); + + +/** + * Same as cass_cluster_set_load_balance_rack_aware(), but with lengths for string + * parameters. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] local_dc + * @param[in] local_dc_length + * @return same as cass_cluster_set_load_balance_dc_aware() + * + * @see cass_cluster_set_load_balance_dc_aware() + */ +CASS_EXPORT CassError +cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, + const char* local_dc, + size_t local_dc_length, + const char* local_rack, + size_t local_rack_length); + /** * Configures the cluster to use token-aware request routing or not. * diff --git a/src/cluster.cpp b/src/cluster.cpp index a46f3ffc1..21ab3eafb 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -18,6 +18,7 @@ #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "external.hpp" #include "logger.hpp" #include "resolver.hpp" @@ -240,6 +241,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list const ControlConnectionSchema& schema, const LoadBalancingPolicy::Ptr& load_balancing_policy, const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc, + const String& local_rack, const StringMultimap& supported_options, const ClusterSettings& settings) : connection_(connection) , listener_(listener ? listener : &nop_cluster_listener__) @@ -251,6 +253,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list , connected_host_(connected_host) , hosts_(hosts) , local_dc_(local_dc) + , local_rack_(local_rack) , supported_options_(supported_options) , is_recording_events_(settings.disable_events_on_startup) { static const auto optimized_msg = "===== Using optimized driver!!! =====\n"; diff --git a/src/cluster.hpp b/src/cluster.hpp index 4fe36ff5d..dec6043b6 100644 --- a/src/cluster.hpp +++ b/src/cluster.hpp @@ -257,6 +257,7 @@ class Cluster * determining the next control connection host. * @param load_balancing_policies * @param local_dc The local datacenter determined by the metadata service for initializing the + * @param local_rack The local rack determined by the metadata service for initializing the * load balancing policies. * @param supported_options Supported options discovered during control connection. * @param settings The control connection settings to use for reconnecting the @@ -267,6 +268,7 @@ class Cluster const ControlConnectionSchema& schema, const LoadBalancingPolicy::Ptr& load_balancing_policy, const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc, + const String& local_rack, const StringMultimap& supported_options, const ClusterSettings& settings); /** @@ -361,6 +363,7 @@ class Cluster const Host::Ptr& connected_host() const { return connected_host_; } const TokenMap::Ptr& token_map() const { return token_map_; } const String& local_dc() const { return local_dc_; } + const String& local_rack() const { return local_rack_; } const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); } const StringMultimap& supported_options() const { return supported_options_; } const ShardPortCalculator* shard_port_calculator() const { return shard_port_calculator_.get(); } @@ -449,6 +452,7 @@ class Cluster PreparedMetadata prepared_metadata_; TokenMap::Ptr token_map_; String local_dc_; + String local_rack_; StringMultimap supported_options_; Timer timer_; bool is_recording_events_; diff --git a/src/cluster_config.cpp b/src/cluster_config.cpp index ab357ed52..e536515b4 100644 --- a/src/cluster_config.cpp +++ b/src/cluster_config.cpp @@ -301,6 +301,27 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, const c return CASS_OK; } +CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc, + const char* local_rack) { + if (local_dc == NULL || local_rack == NULL) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + return cass_cluster_set_load_balance_rack_aware_n(cluster, local_dc, SAFE_STRLEN(local_dc), + local_rack, SAFE_STRLEN(local_rack)); +} + +CassError cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const char* local_dc, + size_t local_dc_length, + const char* local_rack, + size_t local_rack_length) { + if (local_dc == NULL || local_dc_length == 0 || local_rack == NULL || local_rack_length == 0) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + cluster->config().set_load_balancing_policy(new RackAwarePolicy( + String(local_dc, local_dc_length), String(local_rack, local_rack_length))); + return CASS_OK; +} + void cass_cluster_set_token_aware_routing(CassCluster* cluster, cass_bool_t enabled) { cluster->config().set_token_aware_routing(enabled == cass_true); } diff --git a/src/cluster_connector.cpp b/src/cluster_connector.cpp index e0415e151..eb959e17f 100644 --- a/src/cluster_connector.cpp +++ b/src/cluster_connector.cpp @@ -16,6 +16,7 @@ #include "cluster_connector.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "protocol.hpp" #include "random.hpp" #include "round_robin_policy.hpp" @@ -177,6 +178,7 @@ void ClusterConnector::on_resolve(ClusterMetadataResolver* resolver) { } local_dc_ = resolver->local_dc(); + local_rack_ = resolver->local_rack(); remaining_connector_count_ = resolved_contact_points.size(); for (AddressVec::const_iterator it = resolved_contact_points.begin(), end = resolved_contact_points.end(); @@ -231,7 +233,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) { for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end(); it != end; ++it) { LoadBalancingPolicy::Ptr policy(*it); - policy->init(connected_host, hosts, random_, local_dc_); + policy->init(connected_host, hosts, random_, local_dc_, local_rack_); policy->register_handles(event_loop_->loop()); } @@ -248,6 +250,11 @@ void ClusterConnector::on_connect(ControlConnector* connector) { message = "No hosts available for the control connection using the " "DC-aware load balancing policy. " "Check to see if the configured local datacenter is valid"; + } else if (dynamic_cast(query_plan.get()) != + NULL) { // Check if Rack-aware + message = "No hosts available for the control connection using the " + "Rack-aware load balancing policy. " + "Check to see if the configured local datacenter and rack is valid"; } else { message = "No hosts available for the control connection using the " "configured load balancing policy"; @@ -258,7 +265,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) { cluster_.reset(new Cluster(connector->release_connection(), listener_, event_loop_, connected_host, hosts, connector->schema(), default_policy, policies, - local_dc_, connector->supported_options(), settings_)); + local_dc_, local_rack_, connector->supported_options(), settings_)); // Clear any connection errors and set the final negotiated protocol version. error_code_ = CLUSTER_OK; diff --git a/src/cluster_connector.hpp b/src/cluster_connector.hpp index e960fa058..8c8572322 100644 --- a/src/cluster_connector.hpp +++ b/src/cluster_connector.hpp @@ -169,6 +169,7 @@ class ClusterConnector : public RefCounted { Random* random_; Metrics* metrics_; String local_dc_; + String local_rack_; ClusterSettings settings_; Callback callback_; diff --git a/src/cluster_metadata_resolver.hpp b/src/cluster_metadata_resolver.hpp index 90e91acbd..bcca03bf3 100644 --- a/src/cluster_metadata_resolver.hpp +++ b/src/cluster_metadata_resolver.hpp @@ -48,6 +48,7 @@ class ClusterMetadataResolver : public RefCounted { const AddressVec& resolved_contact_points() const { return resolved_contact_points_; } const String& local_dc() const { return local_dc_; } + const String& local_rack() const { return local_rack_; } protected: virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) = 0; @@ -57,6 +58,7 @@ class ClusterMetadataResolver : public RefCounted { protected: AddressVec resolved_contact_points_; String local_dc_; + String local_rack_; Callback callback_; }; diff --git a/src/dc_aware_policy.cpp b/src/dc_aware_policy.cpp index d68bec1a2..5c1550227 100644 --- a/src/dc_aware_policy.cpp +++ b/src/dc_aware_policy.cpp @@ -43,7 +43,7 @@ DCAwarePolicy::DCAwarePolicy(const String& local_dc, size_t used_hosts_per_remot DCAwarePolicy::~DCAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); } void DCAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { if (local_dc_.empty()) { // Only override if no local DC was specified. local_dc_ = local_dc; } diff --git a/src/dc_aware_policy.hpp b/src/dc_aware_policy.hpp index f76b7307b..526338c29 100644 --- a/src/dc_aware_policy.hpp +++ b/src/dc_aware_policy.hpp @@ -37,7 +37,7 @@ class DCAwarePolicy : public LoadBalancingPolicy { ~DCAwarePolicy(); virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/execution_profile.hpp b/src/execution_profile.hpp index 2b5645149..cb4d34f61 100644 --- a/src/execution_profile.hpp +++ b/src/execution_profile.hpp @@ -23,6 +23,7 @@ #include "cassandra.h" #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "dense_hash_map.hpp" #include "latency_aware_policy.hpp" #include "speculative_execution.hpp" diff --git a/src/latency_aware_policy.cpp b/src/latency_aware_policy.cpp index 9f77a384f..29541bbb8 100644 --- a/src/latency_aware_policy.cpp +++ b/src/latency_aware_policy.cpp @@ -27,13 +27,13 @@ using namespace datastax::internal; using namespace datastax::internal::core; void LatencyAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { hosts_->reserve(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost()); for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { i->second->enable_latency_tracking(settings_.scale_ns, settings_.min_measured); } - ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack); } void LatencyAwarePolicy::register_handles(uv_loop_t* loop) { start_timer(loop); } diff --git a/src/latency_aware_policy.hpp b/src/latency_aware_policy.hpp index 178752a4a..c04430c1a 100644 --- a/src/latency_aware_policy.hpp +++ b/src/latency_aware_policy.hpp @@ -51,7 +51,7 @@ class LatencyAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~LatencyAwarePolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual void register_handles(uv_loop_t* loop); virtual void close_handles(); diff --git a/src/list_policy.cpp b/src/list_policy.cpp index 7dc9357d4..fa38c838e 100644 --- a/src/list_policy.cpp +++ b/src/list_policy.cpp @@ -23,7 +23,7 @@ using namespace datastax::internal; using namespace datastax::internal::core; void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { HostMap valid_hosts; for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { const Host::Ptr& host = i->second; @@ -36,7 +36,7 @@ void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Ran LOG_ERROR("No valid hosts available for list policy"); } - ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc, local_rack); } CassHostDistance ListPolicy::distance(const Host::Ptr& host) const { diff --git a/src/list_policy.hpp b/src/list_policy.hpp index bda75f5f5..99b13eb22 100644 --- a/src/list_policy.hpp +++ b/src/list_policy.hpp @@ -31,7 +31,7 @@ class ListPolicy : public ChainedLoadBalancingPolicy { virtual ~ListPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/load_balancing.hpp b/src/load_balancing.hpp index ba60928a9..367860126 100644 --- a/src/load_balancing.hpp +++ b/src/load_balancing.hpp @@ -40,9 +40,21 @@ typedef enum CassBalancingState_ { CASS_BALANCING_NEW_QUERY_PLAN } CassBalancingState; +// CassHostDistance specifies how far a host is from the client. +// The meaning of the distance depends on the load balancing policy. +// The policies should assign the distance starting from the lowest +// without skipping values, i.e. they should start with LOCAL. +// For example: +// - DCAwarePolicy uses LOCAL for same DC, REMOTE for different DC. +// - RackAwarePolicy uses LOCAL for same rack, +// REMOTE for different rack and same DC, and +// REMOTE2 for different DC. +// - RoundRobinPolicy has distinguishes only one distance level and +// always uses LOCAL for all nodes. typedef enum CassHostDistance_ { CASS_HOST_DISTANCE_LOCAL, CASS_HOST_DISTANCE_REMOTE, + CASS_HOST_DISTANCE_REMOTE2, CASS_HOST_DISTANCE_IGNORE } CassHostDistance; @@ -87,7 +99,7 @@ class LoadBalancingPolicy : public RefCounted { virtual ~LoadBalancingPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) = 0; + const String& local_dc, const String &local_rack) = 0; virtual void register_handles(uv_loop_t* loop) {} virtual void close_handles() {} @@ -124,8 +136,8 @@ class ChainedLoadBalancingPolicy : public LoadBalancingPolicy { virtual ~ChainedLoadBalancingPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { - return child_policy_->init(connected_host, hosts, random, local_dc); + const String& local_dc, const String& local_rack) { + return child_policy_->init(connected_host, hosts, random, local_dc, local_rack); } virtual const LoadBalancingPolicy::Ptr& child_policy() const { return child_policy_; } diff --git a/src/rack_aware_policy.cpp b/src/rack_aware_policy.cpp new file mode 100644 index 000000000..394af5b19 --- /dev/null +++ b/src/rack_aware_policy.cpp @@ -0,0 +1,310 @@ +/* + Copyright (c) DataStax, Inc. + Copyright (c) 2022 Kiwi.com + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "rack_aware_policy.hpp" + +#include "logger.hpp" +#include "request_handler.hpp" +#include "scoped_lock.hpp" + +#include + +using namespace datastax; +using namespace datastax::internal; +using namespace datastax::internal::core; + +RackAwarePolicy::RackAwarePolicy(const String& local_dc, const String& local_rack) + : local_dc_(local_dc) + , local_rack_(local_rack) + , local_rack_live_hosts_(new HostVec()) + , index_(0) { + uv_rwlock_init(&available_rwlock_); +} + +RackAwarePolicy::~RackAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); } + +void RackAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, + const String& local_dc, const String& local_rack) { + if (local_dc_.empty()) { // Only override if no local DC was specified. + local_dc_ = local_dc; + } + + if (local_dc_.empty() && connected_host && !connected_host->dc().empty()) { + LOG_INFO("Using '%s' for the local data center " + "(if this is incorrect, please provide the correct data center)", + connected_host->dc().c_str()); + local_dc_ = connected_host->dc(); + } + + if (local_rack_.empty()) { // Only override if no local rack was specified. + local_rack_ = local_rack; + } + + if (local_rack_.empty() && connected_host && !connected_host->rack().empty()) { + LOG_INFO("Using '%s' for the local rack " + "(if this is incorrect, please provide the correct rack)", + connected_host->rack().c_str()); + local_rack_ = connected_host->rack(); + } + + available_.resize(hosts.size()); + std::transform(hosts.begin(), hosts.end(), std::inserter(available_, available_.begin()), + GetAddress()); + + for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { + on_host_added(i->second); + } + if (random != NULL) { + index_ = random->next(std::max(static_cast(1), hosts.size())); + } +} + +CassHostDistance RackAwarePolicy::distance(const Host::Ptr& host) const { + if (local_dc_.empty() || local_rack_.empty() || (host->dc() == local_dc_ && host->rack() == local_rack_)) { + return CASS_HOST_DISTANCE_LOCAL; + } + + if (host->dc() == local_dc_) { + const CopyOnWriteHostVec& hosts = per_remote_rack_live_hosts_.get_hosts(host->rack()); + size_t num_hosts = hosts->size(); + for (size_t i = 0; i < num_hosts; ++i) { + if ((*hosts)[i]->address() == host->address()) { + return CASS_HOST_DISTANCE_REMOTE; + } + } + } + + const CopyOnWriteHostVec& hosts = per_remote_dc_live_hosts_.get_hosts(host->dc()); + size_t num_hosts = hosts->size(); + for (size_t i = 0; i < num_hosts; ++i) { + if ((*hosts)[i]->address() == host->address()) { + return CASS_HOST_DISTANCE_REMOTE2; + } + } + + return CASS_HOST_DISTANCE_IGNORE; +} + +QueryPlan* RackAwarePolicy::new_query_plan(const String& keyspace, RequestHandler* request_handler, + const TokenMap* token_map) { + CassConsistency cl = + request_handler != NULL ? request_handler->consistency() : CASS_DEFAULT_CONSISTENCY; + return new RackAwareQueryPlan(this, cl, index_++); +} + +bool RackAwarePolicy::is_host_up(const Address& address) const { + ScopedReadLock rl(&available_rwlock_); + return available_.count(address) > 0; +} + +void RackAwarePolicy::on_host_added(const Host::Ptr& host) { + const String& dc = host->dc(); + const String& rack = host->rack(); + if (local_dc_.empty() && !dc.empty()) { + LOG_INFO("Using '%s' for local data center " + "(if this is incorrect, please provide the correct data center)", + host->dc().c_str()); + local_dc_ = dc; + } + if (local_rack_.empty() && !rack.empty()) { + LOG_INFO("Using '%s' for local data center " + "(if this is incorrect, please provide the correct data center)", + host->rack().c_str()); + local_rack_ = rack; + } + + if (dc == local_dc_ && rack == local_rack_) { + add_host(local_rack_live_hosts_, host); + } else if (dc == local_dc_) { + per_remote_rack_live_hosts_.add_host_to_key(rack, host); + } else { + per_remote_dc_live_hosts_.add_host_to_key(dc, host); + } +} + +void RackAwarePolicy::on_host_removed(const Host::Ptr& host) { + const String& dc = host->dc(); + const String& rack = host->rack(); + if (dc == local_dc_ && rack == local_rack_) { + remove_host(local_rack_live_hosts_, host); + } else if (dc == local_dc_) { + per_remote_rack_live_hosts_.remove_host_from_key(host->rack(), host); + } else { + per_remote_dc_live_hosts_.remove_host_from_key(host->dc(), host); + } + + ScopedWriteLock wl(&available_rwlock_); + available_.erase(host->address()); +} + +void RackAwarePolicy::on_host_up(const Host::Ptr& host) { + on_host_added(host); + + ScopedWriteLock wl(&available_rwlock_); + available_.insert(host->address()); +} + +void RackAwarePolicy::on_host_down(const Address& address) { + if (!remove_host(local_rack_live_hosts_, address) && + !per_remote_rack_live_hosts_.remove_host(address) && + !per_remote_dc_live_hosts_.remove_host(address)) { + LOG_DEBUG("Attempted to mark host %s as DOWN, but it doesn't exist", + address.to_string().c_str()); + } + + ScopedWriteLock wl(&available_rwlock_); + available_.erase(address); +} + +const String& RackAwarePolicy::local_dc() const { + ScopedReadLock rl(&available_rwlock_); + return local_dc_; +} + +const String& RackAwarePolicy::local_rack() const { + ScopedReadLock rl(&available_rwlock_); + return local_rack_; +} + +void RackAwarePolicy::PerKeyHostMap::add_host_to_key(const String& key, const Host::Ptr& host) { + ScopedWriteLock wl(&rwlock_); + Map::iterator i = map_.find(key); + if (i == map_.end()) { + CopyOnWriteHostVec hosts(new HostVec()); + hosts->push_back(host); + map_.insert(Map::value_type(key, hosts)); + } else { + add_host(i->second, host); + } +} + +void RackAwarePolicy::PerKeyHostMap::remove_host_from_key(const String& key, const Host::Ptr& host) { + ScopedWriteLock wl(&rwlock_); + Map::iterator i = map_.find(key); + if (i != map_.end()) { + core::remove_host(i->second, host); + } +} + +bool RackAwarePolicy::PerKeyHostMap::remove_host(const Address& address) { + ScopedWriteLock wl(&rwlock_); + for (Map::iterator i = map_.begin(), end = map_.end(); i != end; ++i) { + if (core::remove_host(i->second, address)) { + return true; + } + } + return false; +} + +const CopyOnWriteHostVec& RackAwarePolicy::PerKeyHostMap::get_hosts(const String& dc) const { + ScopedReadLock rl(&rwlock_); + Map::const_iterator i = map_.find(dc); + if (i == map_.end()) return no_hosts_; + + return i->second; +} + +void RackAwarePolicy::PerKeyHostMap::copy_keys(KeySet* keys) const { + ScopedReadLock rl(&rwlock_); + for (Map::const_iterator i = map_.begin(), end = map_.end(); i != end; ++i) { + keys->insert(i->first); + } +} + +// Helper functions to prevent copy (Notice: "const CopyOnWriteHostVec&") + +static const Host::Ptr& get_next_host(const CopyOnWriteHostVec& hosts, size_t index) { + return (*hosts)[index % hosts->size()]; +} + +static size_t get_hosts_size(const CopyOnWriteHostVec& hosts) { return hosts->size(); } + +RackAwarePolicy::RackAwareQueryPlan::RackAwareQueryPlan(const RackAwarePolicy* policy, CassConsistency cl, + size_t start_index) + : policy_(policy) + , cl_(cl) + , hosts_(policy_->local_rack_live_hosts_) + , local_remaining_(get_hosts_size(hosts_)) + , remote_remaining_(0) + , index_(start_index) {} + +Host::Ptr RackAwarePolicy::RackAwareQueryPlan::compute_next() { + while (local_remaining_ > 0) { + --local_remaining_; + const Host::Ptr& host(get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (!remote_racks_) { + remote_racks_.reset(new PerKeyHostMap::KeySet()); + policy_->per_remote_rack_live_hosts_.copy_keys(remote_racks_.get()); + } + + while (true) { + while (remote_remaining_ > 0) { + --remote_remaining_; + const Host::Ptr& host( + get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (remote_racks_->empty()) { + break; + } + + PerKeyHostMap::KeySet::iterator i = remote_racks_->begin(); + hosts_ = policy_->per_remote_rack_live_hosts_.get_hosts(*i); + remote_remaining_ = get_hosts_size(hosts_); + remote_racks_->erase(i); + } + + // Skip remote DCs for LOCAL_ consistency levels. + if (is_dc_local(cl_)) { + return Host::Ptr(); + } + + if (!remote_dcs_) { + remote_dcs_.reset(new PerKeyHostMap::KeySet()); + policy_->per_remote_dc_live_hosts_.copy_keys(remote_dcs_.get()); + } + + while (true) { + while (remote_remaining_ > 0) { + --remote_remaining_; + const Host::Ptr& host( + get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (remote_dcs_->empty()) { + break; + } + + PerKeyHostMap::KeySet::iterator i = remote_dcs_->begin(); + hosts_ = policy_->per_remote_dc_live_hosts_.get_hosts(*i); + remote_remaining_ = get_hosts_size(hosts_); + remote_dcs_->erase(i); + } + + return Host::Ptr(); +} diff --git a/src/rack_aware_policy.hpp b/src/rack_aware_policy.hpp new file mode 100644 index 000000000..33a8cbbef --- /dev/null +++ b/src/rack_aware_policy.hpp @@ -0,0 +1,128 @@ +/* + Copyright (c) DataStax, Inc. + Copyright (c) 2022 Kiwi.com + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef DATASTAX_INTERNAL_RACK_AWARE_POLICY_HPP +#define DATASTAX_INTERNAL_RACK_AWARE_POLICY_HPP + +#include "host.hpp" +#include "load_balancing.hpp" +#include "map.hpp" +#include "round_robin_policy.hpp" +#include "scoped_lock.hpp" +#include "scoped_ptr.hpp" +#include "set.hpp" + +#include + +namespace datastax { namespace internal { namespace core { + +class RackAwarePolicy : public LoadBalancingPolicy { +public: + RackAwarePolicy(const String& local_dc = "", const String &local_rack = ""); + + ~RackAwarePolicy(); + + virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, + const String& local_dc, const String& local_rack); + + virtual CassHostDistance distance(const Host::Ptr& host) const; + + virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler, + const TokenMap* token_map); + + virtual bool is_host_up(const Address& address) const; + + virtual void on_host_added(const Host::Ptr& host); + virtual void on_host_removed(const Host::Ptr& host); + virtual void on_host_up(const Host::Ptr& host); + virtual void on_host_down(const Address& address); + + virtual const String& local_dc() const; + virtual const String& local_rack() const; + + virtual LoadBalancingPolicy* new_instance() { + return new RackAwarePolicy(local_dc_, local_rack_); + } + +private: + class PerKeyHostMap { + public: + typedef internal::Map Map; + typedef Set KeySet; + + PerKeyHostMap() + : no_hosts_(new HostVec()) { + uv_rwlock_init(&rwlock_); + } + ~PerKeyHostMap() { uv_rwlock_destroy(&rwlock_); } + + void add_host_to_key(const String& key, const Host::Ptr& host); + void remove_host_from_key(const String& key, const Host::Ptr& host); + bool remove_host(const Address& address); + const CopyOnWriteHostVec& get_hosts(const String& key) const; + void copy_keys(KeySet* keys) const; + + private: + Map map_; + mutable uv_rwlock_t rwlock_; + const CopyOnWriteHostVec no_hosts_; + + private: + DISALLOW_COPY_AND_ASSIGN(PerKeyHostMap); + }; + + const CopyOnWriteHostVec& get_local_dc_hosts() const; + void get_remote_dcs(PerKeyHostMap::KeySet* remote_dcs) const; + +public: + class RackAwareQueryPlan : public QueryPlan { + public: + RackAwareQueryPlan(const RackAwarePolicy* policy, CassConsistency cl, size_t start_index); + + virtual Host::Ptr compute_next(); + + private: + const RackAwarePolicy* policy_; + CassConsistency cl_; + CopyOnWriteHostVec hosts_; + ScopedPtr remote_racks_; + ScopedPtr remote_dcs_; + size_t local_remaining_; + size_t remote_remaining_; + size_t index_; + }; + +private: + mutable uv_rwlock_t available_rwlock_; + AddressSet available_; + + String local_dc_; + String local_rack_; + + CopyOnWriteHostVec local_rack_live_hosts_; + // remote rack, local dc + PerKeyHostMap per_remote_rack_live_hosts_; + PerKeyHostMap per_remote_dc_live_hosts_; + size_t index_; + +private: + DISALLOW_COPY_AND_ASSIGN(RackAwarePolicy); +}; + +}}} // namespace datastax::internal::core + +#endif diff --git a/src/request_processor.cpp b/src/request_processor.cpp index d02ad9e7f..bda526df4 100644 --- a/src/request_processor.cpp +++ b/src/request_processor.cpp @@ -170,7 +170,7 @@ RequestProcessor::RequestProcessor(RequestProcessorListener* listener, EventLoop const Host::Ptr& connected_host, const HostMap& hosts, const TokenMap::Ptr& token_map, const RequestProcessorSettings& settings, Random* random, - const String& local_dc) + const String& local_dc, const String& local_rack) : connection_pool_manager_(connection_pool_manager) , listener_(listener ? listener : &nop_request_processor_listener__) , event_loop_(event_loop) @@ -213,7 +213,7 @@ RequestProcessor::RequestProcessor(RequestProcessorListener* listener, EventLoop LoadBalancingPolicy::Vec policies = load_balancing_policies(); for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(); it != policies.end(); ++it) { // Initialize the load balancing policies - (*it)->init(connected_host, hosts, random, local_dc); + (*it)->init(connected_host, hosts, random, local_dc, local_rack); (*it)->register_handles(event_loop_->loop()); } diff --git a/src/request_processor.hpp b/src/request_processor.hpp index 67b0bf47c..253ac4828 100644 --- a/src/request_processor.hpp +++ b/src/request_processor.hpp @@ -166,12 +166,13 @@ class RequestProcessor * @param settings The current settings for the request processor. * @param random A RNG for randomizing hosts in the load balancing policies. * @param local_dc The local datacenter for initializing the load balancing policies. + * @param local_rack The local rack for initializing the load balancing policies. */ RequestProcessor(RequestProcessorListener* listener, EventLoop* event_loop, const ConnectionPoolManager::Ptr& connection_pool_manager, const Host::Ptr& connected_host, const HostMap& hosts, const TokenMap::Ptr& token_map, const RequestProcessorSettings& settings, - Random* random, const String& local_dc); + Random* random, const String& local_dc, const String& local_rack); /** * Close/Terminate the request request processor (thread-safe). diff --git a/src/request_processor_initializer.cpp b/src/request_processor_initializer.cpp index 6705b72e9..24d1a6d48 100644 --- a/src/request_processor_initializer.cpp +++ b/src/request_processor_initializer.cpp @@ -40,7 +40,7 @@ class RunInitializeProcessor : public Task { RequestProcessorInitializer::RequestProcessorInitializer( const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, - const TokenMap::Ptr& token_map, const String& local_dc, const Callback& callback) + const TokenMap::Ptr& token_map, const String& local_dc, const String& local_rack, const Callback& callback) : event_loop_(NULL) , listener_(NULL) , metrics_(NULL) @@ -51,6 +51,7 @@ RequestProcessorInitializer::RequestProcessorInitializer( , hosts_(hosts) , token_map_(token_map) , local_dc_(local_dc) + , local_rack_(local_rack) , error_code_(REQUEST_PROCESSOR_OK) , callback_(callback) { uv_mutex_init(&mutex_); @@ -166,7 +167,7 @@ void RequestProcessorInitializer::on_initialize(ConnectionPoolManagerInitializer } else { processor_.reset(new RequestProcessor(listener_, event_loop_, initializer->release_manager(), connected_host_, hosts_, token_map_, settings_, random_, - local_dc_)); + local_dc_, local_rack_)); int rc = processor_->init(RequestProcessor::Protected()); if (rc != 0) { diff --git a/src/request_processor_initializer.hpp b/src/request_processor_initializer.hpp index 8d63380d9..b685e5dd2 100644 --- a/src/request_processor_initializer.hpp +++ b/src/request_processor_initializer.hpp @@ -60,12 +60,13 @@ class RequestProcessorInitializer * @param hosts A mapping of available hosts in the cluster. * @param token_map A token map. * @param local_dc The local datacenter for initializing the load balancing policies. + * @param local_rack The local datacenter for initializing the load balancing policies. * @param callback A callback that is called when the processor is initialized * or if an error occurred. */ RequestProcessorInitializer(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc, const Callback& callback); + const String& local_dc, const String& local_rack, const Callback& callback); ~RequestProcessorInitializer(); /** @@ -176,6 +177,7 @@ class RequestProcessorInitializer HostMap hosts_; const TokenMap::Ptr token_map_; String local_dc_; + String local_rack_; RequestProcessorError error_code_; String error_message_; diff --git a/src/round_robin_policy.cpp b/src/round_robin_policy.cpp index dd7f2ecff..4c8ac62d7 100644 --- a/src/round_robin_policy.cpp +++ b/src/round_robin_policy.cpp @@ -33,7 +33,7 @@ RoundRobinPolicy::RoundRobinPolicy() RoundRobinPolicy::~RoundRobinPolicy() { uv_rwlock_destroy(&available_rwlock_); } void RoundRobinPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { available_.resize(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::inserter(available_, available_.begin()), GetAddress()); diff --git a/src/round_robin_policy.hpp b/src/round_robin_policy.hpp index f5b4f715d..aebc62deb 100644 --- a/src/round_robin_policy.hpp +++ b/src/round_robin_policy.hpp @@ -31,7 +31,7 @@ class RoundRobinPolicy : public LoadBalancingPolicy { ~RoundRobinPolicy(); virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/session.cpp b/src/session.cpp index 34de94c13..9d725da46 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -195,13 +195,14 @@ class SessionInitializer : public RefCounted { SessionInitializer() { uv_mutex_destroy(&mutex_); } void initialize(const Host::Ptr& connected_host, ProtocolVersion protocol_version, - const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc) { + const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc, + const String& local_rack) { inc_ref(); const size_t thread_count_io = remaining_ = session_->config().thread_count_io(); for (size_t i = 0; i < thread_count_io; ++i) { RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - connected_host, protocol_version, hosts, token_map, local_dc, + connected_host, protocol_version, hosts, token_map, local_dc, local_rack, bind_callback(&SessionInitializer::on_initialize, this))); RequestProcessorSettings settings(session_->config()); @@ -360,7 +361,7 @@ void Session::join() { void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc) { + const String& local_dc, const String& local_rack) { int rc = 0; if (hosts.empty()) { @@ -394,7 +395,7 @@ void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protoc request_processor_count_ = 0; is_closing_ = false; SessionInitializer::Ptr initializer(new SessionInitializer(this)); - initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc); + initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc, local_rack); } void Session::on_close() { diff --git a/src/session.hpp b/src/session.hpp index 856833114..45efc5e89 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -54,7 +54,7 @@ class Session virtual void on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual void on_close(); diff --git a/src/session_base.cpp b/src/session_base.cpp index 6ccb4f7c5..c1e7b6333 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -160,7 +160,7 @@ void SessionBase::notify_closed() { void SessionBase::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc) { + const String& local_dc, const String& local_rack) { notify_connected(); } @@ -200,7 +200,7 @@ void SessionBase::on_initialize(ClusterConnector* connector) { } on_connect(cluster_->connected_host(), cluster_->protocol_version(), - cluster_->available_hosts(), cluster_->token_map(), cluster_->local_dc()); + cluster_->available_hosts(), cluster_->token_map(), cluster_->local_dc(), cluster_->local_rack()); } else { assert(!connector->is_canceled() && "Cluster connection process canceled"); switch (connector->error_code()) { diff --git a/src/session_base.hpp b/src/session_base.hpp index 1c3e6c68f..b0c3c7c16 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -117,7 +117,7 @@ class SessionBase : public ClusterListener { */ virtual void on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc); + const String& local_dc, const String& local_rack); /** * A callback called after the control connection fails to connect. By default diff --git a/src/token_aware_policy.cpp b/src/token_aware_policy.cpp index cc2cd8394..adb7a5f68 100644 --- a/src/token_aware_policy.cpp +++ b/src/token_aware_policy.cpp @@ -37,7 +37,7 @@ static inline bool contains(const CopyOnWriteHostVec& replicas, const Address& a } void TokenAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { if (random != NULL) { if (shuffle_replicas_) { // Store random so that it can be used to shuffle replicas. @@ -48,7 +48,7 @@ void TokenAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& host index_ = random->next(std::max(static_cast(1), hosts.size())); } } - ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack); } QueryPlan* TokenAwarePolicy::new_query_plan(const String& keyspace, RequestHandler* request_handler, @@ -87,8 +87,8 @@ QueryPlan* TokenAwarePolicy::new_query_plan(const String& keyspace, RequestHandl } Host::Ptr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { - while (remaining_ > 0) { - --remaining_; + while (remaining_local_ > 0) { + --remaining_local_; const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); if (child_policy_->is_host_up(host->address()) && child_policy_->distance(host) == CASS_HOST_DISTANCE_LOCAL) { @@ -96,10 +96,28 @@ Host::Ptr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { } } + while (remaining_remote_ > 0) { + --remaining_remote_; + const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); + if (child_policy_->is_host_up(host->address()) && + child_policy_->distance(host) == CASS_HOST_DISTANCE_REMOTE) { + return host; + } + } + + while (remaining_remote2_ > 0) { + --remaining_remote2_; + const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); + if (child_policy_->is_host_up(host->address()) && + child_policy_->distance(host) == CASS_HOST_DISTANCE_REMOTE2) { + return host; + } + } + Host::Ptr host; while ((host = child_plan_->compute_next())) { if (!contains(replicas_, host->address()) || - child_policy_->distance(host) != CASS_HOST_DISTANCE_LOCAL) { + child_policy_->distance(host) > CASS_HOST_DISTANCE_REMOTE2) { return host; } } diff --git a/src/token_aware_policy.hpp b/src/token_aware_policy.hpp index 5a8cee903..637f041cf 100644 --- a/src/token_aware_policy.hpp +++ b/src/token_aware_policy.hpp @@ -35,7 +35,7 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~TokenAwarePolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler, const TokenMap* token_map); @@ -53,7 +53,9 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { , child_plan_(child_plan) , replicas_(replicas) , index_(start_index) - , remaining_(replicas->size()) {} + , remaining_local_(replicas->size()) + , remaining_remote_(replicas->size()) + , remaining_remote2_(replicas->size()) {} Host::Ptr compute_next(); @@ -62,7 +64,9 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { ScopedPtr child_plan_; CopyOnWriteHostVec replicas_; size_t index_; - size_t remaining_; + size_t remaining_local_; + size_t remaining_remote_; + size_t remaining_remote2_; }; Random* random_; diff --git a/tests/src/unit/tests/test_load_balancing.cpp b/tests/src/unit/tests/test_load_balancing.cpp index a5316ce80..c55ea990a 100644 --- a/tests/src/unit/tests/test_load_balancing.cpp +++ b/tests/src/unit/tests/test_load_balancing.cpp @@ -1,5 +1,6 @@ /* Copyright (c) DataStax, Inc. + Copyright (c) 2023 Kiwi.com Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,6 +22,7 @@ #include "blacklist_policy.hpp" #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "event_loop.hpp" #include "latency_aware_policy.hpp" #include "murmur3.hpp" @@ -46,6 +48,9 @@ const String LOCAL_DC = "local"; const String REMOTE_DC = "remote"; const String BACKUP_DC = "backup"; +const String LOCAL_RACK = "local"; +const String REMOTE_RACK = "remote"; + #define VECTOR_FROM(t, a) Vector(a, a + sizeof(a) / sizeof(a[0])) Address addr_for_sequence(size_t i) { @@ -116,6 +121,17 @@ void verify_dcs(const QueryCounts& counts, const HostMap& hosts, const String& e } } +// verify_racks checks that all hosts in counts are from expected_dc and expected_rack. +void verify_racks(const QueryCounts& counts, const HostMap& hosts, const String& expected_dc, + const String& expected_rack) { + for (QueryCounts::const_iterator it = counts.begin(), end = counts.end(); it != end; ++it) { + HostMap::const_iterator host_it = hosts.find(it->first); + ASSERT_NE(host_it, hosts.end()); + EXPECT_EQ(expected_dc, host_it->second->dc()); + EXPECT_EQ(expected_rack, host_it->second->rack()); + } +} + // verify_query_counts checks that each host was used expected_count times. void verify_query_counts(const QueryCounts& counts, int expected_count) { for (QueryCounts::const_iterator it = counts.begin(), end = counts.end(); it != end; ++it) { @@ -184,7 +200,7 @@ void test_dc_aware_policy(size_t local_count, size_t remote_count) { populate_hosts(local_count, "rack", LOCAL_DC, &hosts); populate_hosts(remote_count, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy(LOCAL_DC, remote_count, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); const size_t total_hosts = local_count + remote_count; @@ -200,7 +216,7 @@ TEST(RoundRobinLoadBalancingUnitTest, Simple) { populate_hosts(2, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // start on first elem ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -222,7 +238,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnAdd) { populate_hosts(2, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // baseline ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -245,7 +261,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnRemove) { populate_hosts(3, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); SharedRefPtr host = hosts.begin()->second; @@ -266,7 +282,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnUpAndDown) { populate_hosts(3, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp_before1(policy.new_query_plan("ks", NULL, NULL)); ScopedPtr qp_before2(policy.new_query_plan("ks", NULL, NULL)); @@ -312,7 +328,7 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) { populate_hosts(3, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); { // All nodes QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); @@ -353,7 +369,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, SomeDatacenterLocalUnspecified) { h->set_rack_and_dc("", ""); DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -368,7 +384,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, SingleLocalDown) { populate_hosts(1, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp_before( policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan @@ -395,7 +411,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllLocalRemovedReturned) { populate_hosts(1, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp_before( policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan @@ -427,7 +443,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, RemoteRemovedReturned) { SharedRefPtr target_host = hosts[target_addr]; DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp_before( policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan @@ -458,7 +474,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, UsedHostsPerDatacenter) { for (size_t used_hosts = 0; used_hosts < 4; ++used_hosts) { DCAwarePolicy policy(LOCAL_DC, used_hosts, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); Vector seq; @@ -491,7 +507,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllowRemoteDatacentersForLocalConsist // Not allowing remote DCs for local CLs bool allow_remote_dcs_for_local_cl = false; DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // Set local CL QueryRequest::Ptr request(new QueryRequest("", 0)); @@ -509,7 +525,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllowRemoteDatacentersForLocalConsist // Allowing remote DCs for local CLs bool allow_remote_dcs_for_local_cl = true; DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // Set local CL QueryRequest::Ptr request(new QueryRequest("", 0)); @@ -532,7 +548,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, StartWithEmptyLocalDatacenter) { // Set local DC using connected host { DCAwarePolicy policy("", 0, false); - policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL, ""); + policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); const size_t seq[] = { 2, 3, 4 }; @@ -542,7 +558,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, StartWithEmptyLocalDatacenter) { // Set local DC using first host with non-empty DC { DCAwarePolicy policy("", 0, false); - policy.init(SharedRefPtr(new Host(Address("0.0.0.0", 9042))), hosts, NULL, ""); + policy.init(SharedRefPtr(new Host(Address("0.0.0.0", 9042))), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); const size_t seq[] = { 1 }; @@ -562,7 +578,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { populate_hosts(3, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy("", 0, false); - policy.init(hosts.begin()->second, hosts, NULL, ""); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); { // All local nodes QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); @@ -605,7 +621,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { populate_hosts(3, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy("", 3, false); // Allow all remote DC nodes - policy.init(hosts.begin()->second, hosts, NULL, ""); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); Host::Ptr remote_dc_node1; { // Mark down all local nodes @@ -652,6 +668,355 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { } } +// Check that host with unspecified rack and DC is last. +TEST(RackAwareLoadBalancingUnitTest, SomeDatacenterRackLocalUnspecified) { + const size_t total_hosts = 3; + HostMap hosts; + populate_hosts(total_hosts, LOCAL_RACK, LOCAL_DC, &hosts); + Host* h = hosts.begin()->second.get(); + h->set_rack_and_dc("", ""); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + // Use CL=ONE to allow remote DCs. + QueryRequest::Ptr request(new QueryRequest("", 0)); + request->set_consistency(CASS_CONSISTENCY_ONE); + SharedRefPtr request_handler( + new RequestHandler(request, ResponseFuture::Ptr())); + + ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); + + const size_t seq[] = { 2, 3, 1 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); +} + +// Check that host with unspecified rack is last. +TEST(RackAwareLoadBalancingUnitTest, SomeRackLocalUnspecified) { + const size_t total_hosts = 3; + HostMap hosts; + populate_hosts(total_hosts, LOCAL_RACK, LOCAL_DC, &hosts); + Host* h = hosts.begin()->second.get(); + h->set_rack_and_dc("", LOCAL_DC); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + // Use CL=ONE to allow remote DCs. + QueryRequest::Ptr request(new QueryRequest("", 0)); + request->set_consistency(CASS_CONSISTENCY_ONE); + SharedRefPtr request_handler( + new RequestHandler(request, ResponseFuture::Ptr())); + + ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); + + const size_t seq[] = { 2, 3, 1 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); +} + +// Check that down host is not returned. +TEST(RackAwareLoadBalancingUnitTest, SingleLocalDown) { + HostMap hosts; + populate_hosts(3, LOCAL_RACK, LOCAL_DC, &hosts); + SharedRefPtr target_host = hosts.begin()->second; + populate_hosts(1, REMOTE_RACK, LOCAL_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + ScopedPtr qp_before( + policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan + ScopedPtr qp_after( + policy.new_query_plan("ks", NULL, NULL)); // should not have down host ptr in plan + + policy.on_host_down(target_host->address()); + { + const size_t seq[] = { 2, 3, 4 }; + verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq)); + } + + policy.on_host_up(target_host); + { + const size_t seq[] = { 2, 3, 1, 4 }; // local dc wrapped before remote offered + verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, AllLocalRemovedReturned) { + HostMap hosts; + populate_hosts(1, LOCAL_RACK, LOCAL_DC, &hosts); + SharedRefPtr target_host = hosts.begin()->second; + populate_hosts(1, REMOTE_RACK, LOCAL_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + ScopedPtr qp_before( + policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan + policy.on_host_down(target_host->address()); + ScopedPtr qp_after( + policy.new_query_plan("ks", NULL, NULL)); // should not have down host ptr in plan + + { + const size_t seq[] = { 2 }; + verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq)); + verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq)); + } + + policy.on_host_up(target_host); + + // make sure we get the local node first after on_up + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + { + const size_t seq[] = { 1, 2 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, RemoteRemovedReturned) { + HostMap hosts; + populate_hosts(1, LOCAL_RACK, LOCAL_DC, &hosts); + populate_hosts(1, REMOTE_RACK, LOCAL_DC, &hosts); + Address target_addr("2.0.0.0", 9042); + SharedRefPtr target_host = hosts[target_addr]; + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + ScopedPtr qp_before( + policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan + policy.on_host_down(target_host->address()); + ScopedPtr qp_after( + policy.new_query_plan("ks", NULL, NULL)); // should not have down host ptr in plan + + { + const size_t seq[] = { 1 }; + verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq)); + verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq)); + } + + policy.on_host_up(target_host); + + // make sure we get both nodes, correct order after + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + { + const size_t seq[] = { 1, 2 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, DoNotAllowRemoteDatacentersForLocalConsistencyLevel) { + HostMap hosts; + populate_hosts(3, "rack", LOCAL_DC, &hosts); + populate_hosts(3, "rack", REMOTE_DC, &hosts); + + // Not allowing remote DCs for local CLs + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + // Set local CL + QueryRequest::Ptr request(new QueryRequest("", 0)); + request->set_consistency(CASS_CONSISTENCY_LOCAL_ONE); + SharedRefPtr request_handler( + new RequestHandler(request, ResponseFuture::Ptr())); + + // Check for only local hosts are used + ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); + const size_t seq[] = { 1, 2, 3 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); +} + +TEST(RackAwareLoadBalancingUnitTest, StartWithEmptyLocalRack) { + HostMap hosts; + populate_hosts(1, REMOTE_RACK, LOCAL_DC, &hosts); + populate_hosts(3, LOCAL_RACK, LOCAL_DC, &hosts); + + // Set local rack using connected host + { + RackAwarePolicy policy("", ""); + policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL, "", ""); + + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + const size_t seq[] = { 2, 3, 4, 1 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } + + // Set local rack using first host with non-empty rack + { + RackAwarePolicy policy("", ""); + policy.init(SharedRefPtr(new Host(Address("0.0.0.0", 9042))), hosts, NULL, "", ""); + + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + const size_t seq[] = { 1, 3, 4, 2 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, StartWithEmptyLocalDatacenter) { + HostMap hosts; + populate_hosts(1, "rack", REMOTE_DC, &hosts); + populate_hosts(3, "rack", LOCAL_DC, &hosts); + + // Set local DC using connected host + { + RackAwarePolicy policy("", ""); + policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL, "", ""); + + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + const size_t seq[] = { 2, 3, 4 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } + + // Set local DC using first host with non-empty DC + { + RackAwarePolicy policy("", ""); + policy.init(SharedRefPtr(new Host(Address("0.0.0.0", 9042))), hosts, NULL, "", ""); + + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + const size_t seq[] = { 1 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalRack) { + HostMap hosts; + populate_hosts(3, LOCAL_RACK, LOCAL_DC, &hosts); + populate_hosts(3, REMOTE_RACK, LOCAL_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); + + { // All local nodes + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_racks(counts, hosts, LOCAL_DC, LOCAL_RACK); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_down(hosts.begin()->first); + + { // One local node down + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_racks(counts, hosts, LOCAL_DC, LOCAL_RACK); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } + + policy.on_host_up(hosts.begin()->second); + + { // All local nodes again + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_racks(counts, hosts, LOCAL_DC, LOCAL_RACK); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_removed(hosts.begin()->second); + + { // One local node removed + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_racks(counts, hosts, LOCAL_DC, LOCAL_RACK); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } +} + +TEST(RackAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { + HostMap hosts; + populate_hosts(3, "rack", LOCAL_DC, &hosts); + populate_hosts(3, "rack", REMOTE_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); + + { // All local nodes + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, LOCAL_DC); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_down(hosts.begin()->first); + + { // One local node down + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, LOCAL_DC); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } + + policy.on_host_up(hosts.begin()->second); + + { // All local nodes again + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, LOCAL_DC); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_removed(hosts.begin()->second); + + { // One local node removed + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, LOCAL_DC); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } +} + +TEST(RackAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { + HostMap hosts; + populate_hosts(3, LOCAL_RACK, LOCAL_DC, &hosts); + populate_hosts(3, LOCAL_RACK, REMOTE_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); + + Host::Ptr remote_dc_node1; + { // Mark down all local nodes + HostMap::iterator it = hosts.begin(); + for (int i = 0; i < 3; ++i) { + policy.on_host_down(it->first); + it++; + } + remote_dc_node1 = it->second; + } + + { // All remote nodes + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, REMOTE_DC); + ASSERT_EQ(counts.size(), 3u) << "Should use all hosts from remote DC"; + verify_query_counts(counts, 4); + } + + policy.on_host_down(remote_dc_node1->address()); + + { // One remote node down + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, REMOTE_DC); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } + + policy.on_host_up(remote_dc_node1); + + { // All remote nodes again + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, REMOTE_DC); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_removed(remote_dc_node1); + + { // One remote node removed + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, REMOTE_DC); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } +} + TEST(TokenAwareLoadBalancingUnitTest, Simple) { const int64_t num_hosts = 4; HostMap hosts; @@ -679,7 +1044,7 @@ TEST(TokenAwareLoadBalancingUnitTest, Simple) { token_map->build(); TokenAwarePolicy policy(new RoundRobinPolicy(), false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); QueryRequest::Ptr request(new QueryRequest("", 1)); const char* value = "kjdfjkldsdjkl"; // hash: 9024137376112061887 @@ -725,12 +1090,12 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { // Tokens // 1.0.0.0 local -6588122883467697006 - // 2.0.0.0 remote -3952873730080618204 - // 3.0.0.0 local -1317624576693539402 - // 4.0.0.0 remote 1317624576693539400 - // 5.0.0.0 local 3952873730080618202 + // 2.0.0.0 remote -3952873730080618204 (replica for -5434086359492102041) + // 3.0.0.0 local -1317624576693539402 (replica for -5434086359492102041) + // 4.0.0.0 remote 1317624576693539400 (replica for -5434086359492102041) + // 5.0.0.0 local 3952873730080618202 (replica for -5434086359492102041) // 6.0.0.0 remote 6588122883467697004 - // 7.0.0.0 local 9223372036854775806 + // 7.0.0.0 local 9223372036854775806 (replica for -5434086359492102041) const uint64_t partition_size = CASS_UINT64_MAX / num_hosts; Murmur3Partitioner::Token token = CASS_INT64_MIN + static_cast(partition_size); @@ -752,7 +1117,7 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { token_map->build(); TokenAwarePolicy policy(new DCAwarePolicy(LOCAL_DC, num_hosts / 2, false), false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); QueryRequest::Ptr request(new QueryRequest("", 1)); const char* value = "abc"; // hash: -5434086359492102041 @@ -762,7 +1127,7 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { { ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); - const size_t seq[] = { 3, 5, 7, 1, 4, 6, 2 }; + const size_t seq[] = { 3, 5, 7, 2, 4, 1, 6 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -772,7 +1137,7 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { { ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); - const size_t seq[] = { 3, 5, 7, 4, 6, 2 }; + const size_t seq[] = { 3, 5, 7, 2, 4, 6 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -784,7 +1149,7 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { { ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); - const size_t seq[] = { 5, 7, 1, 6, 2, 4 }; + const size_t seq[] = { 5, 7, 2, 4, 1, 6 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } } @@ -826,7 +1191,7 @@ TEST(TokenAwareLoadBalancingUnitTest, ShuffleReplicas) { HostVec not_shuffled; { TokenAwarePolicy policy(new RoundRobinPolicy(), false); // Not shuffled - policy.init(SharedRefPtr(), hosts, &random, ""); + policy.init(SharedRefPtr(), hosts, &random, "", ""); ScopedPtr qp1(policy.new_query_plan("test", request_handler.get(), token_map.get())); for (int i = 0; i < num_hosts; ++i) { not_shuffled.push_back(qp1->compute_next()); @@ -844,7 +1209,7 @@ TEST(TokenAwareLoadBalancingUnitTest, ShuffleReplicas) { // Verify that the shuffle setting does indeed shuffle the replicas { TokenAwarePolicy shuffle_policy(new RoundRobinPolicy(), true); // Shuffled - shuffle_policy.init(SharedRefPtr(), hosts, &random, ""); + shuffle_policy.init(SharedRefPtr(), hosts, &random, "", ""); HostVec shuffled_previous; ScopedPtr qp( @@ -942,7 +1307,7 @@ TEST(LatencyAwareLoadBalancingUnitTest, Simple) { HostMap hosts; populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts); LatencyAwarePolicy policy(new RoundRobinPolicy(), settings); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // Record some latencies with 100 ns being the minimum for (HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) { @@ -1004,7 +1369,7 @@ TEST(LatencyAwareLoadBalancingUnitTest, MinAverageUnderMinMeasured) { HostMap hosts; populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts); LatencyAwarePolicy policy(new RoundRobinPolicy(), settings); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); int count = 1; for (HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) { @@ -1038,7 +1403,7 @@ TEST(WhitelistLoadBalancingUnitTest, Hosts) { whitelist_hosts.push_back("37.0.0.0"); whitelist_hosts.push_back("83.0.0.0"); WhitelistPolicy policy(new RoundRobinPolicy(), whitelist_hosts); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -1059,7 +1424,7 @@ TEST(WhitelistLoadBalancingUnitTest, Datacenters) { whitelist_dcs.push_back(LOCAL_DC); whitelist_dcs.push_back(REMOTE_DC); WhitelistDCPolicy policy(new RoundRobinPolicy(), whitelist_dcs); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -1079,7 +1444,7 @@ TEST(BlacklistLoadBalancingUnitTest, Hosts) { blacklist_hosts.push_back("2.0.0.0"); blacklist_hosts.push_back("3.0.0.0"); BlacklistPolicy policy(new RoundRobinPolicy(), blacklist_hosts); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -1100,7 +1465,7 @@ TEST(BlacklistLoadBalancingUnitTest, Datacenters) { blacklist_dcs.push_back(LOCAL_DC); blacklist_dcs.push_back(REMOTE_DC); BlacklistDCPolicy policy(new RoundRobinPolicy(), blacklist_dcs); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); diff --git a/tests/src/unit/tests/test_request_processor.cpp b/tests/src/unit/tests/test_request_processor.cpp index d1900397d..69b13b423 100644 --- a/tests/src/unit/tests/test_request_processor.cpp +++ b/tests/src/unit/tests/test_request_processor.cpp @@ -37,7 +37,7 @@ class InorderLoadBalancingPolicy : public LoadBalancingPolicy { , hosts_(new HostVec()) {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { hosts_->reserve(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost()); } @@ -277,7 +277,7 @@ TEST_F(RequestProcessorUnitTest, Simple) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->initialize(event_loop()); @@ -296,7 +296,7 @@ TEST_F(RequestProcessorUnitTest, CloseWithRequestsPending) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->initialize(event_loop()); @@ -334,7 +334,7 @@ TEST_F(RequestProcessorUnitTest, Auth) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -359,7 +359,7 @@ TEST_F(RequestProcessorUnitTest, Ssl) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->with_settings(settings)->initialize(event_loop()); @@ -383,7 +383,7 @@ TEST_F(RequestProcessorUnitTest, NotifyAddRemoveHost) { Future::Ptr up_future(new Future()); Future::Ptr down_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -415,7 +415,7 @@ TEST_F(RequestProcessorUnitTest, CloseDuringReconnect) { Future::Ptr close_future(new Future()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -450,7 +450,7 @@ TEST_F(RequestProcessorUnitTest, CloseDuringAddNewHost) { Future::Ptr close_future(new Future()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); CloseListener::Ptr listener(new CloseListener(close_future)); @@ -480,7 +480,7 @@ TEST_F(RequestProcessorUnitTest, PoolDown) { Future::Ptr up_future(new Future()); Future::Ptr down_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); UpDownListener::Ptr listener(new UpDownListener(up_future, down_future, target_host)); @@ -510,7 +510,7 @@ TEST_F(RequestProcessorUnitTest, PoolUp) { Future::Ptr up_future(new Future()); Future::Ptr down_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -538,7 +538,7 @@ TEST_F(RequestProcessorUnitTest, InvalidAuth) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -564,7 +564,7 @@ TEST_F(RequestProcessorUnitTest, InvalidSsl) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); SslContext::Ptr ssl_context(SslContextFactory::create()); // No trusted cert @@ -600,7 +600,7 @@ TEST_F(RequestProcessorUnitTest, RollingRestart) { HostMap hosts(generate_hosts()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -632,7 +632,7 @@ TEST_F(RequestProcessorUnitTest, NoHostsAvailable) { HostMap hosts(generate_hosts()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->with_listener(listener.get())->initialize(event_loop()); @@ -668,7 +668,7 @@ TEST_F(RequestProcessorUnitTest, RequestTimeout) { HostMap hosts(generate_hosts()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->with_listener(listener.get())->initialize(event_loop()); @@ -718,7 +718,7 @@ TEST_F(RequestProcessorUnitTest, LowNumberOfStreams) { settings.request_queue_size = 2 * CASS_MAX_STREAMS + 1; // Create a request queue with enough room RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->with_settings(settings)->with_listener(listener.get())->initialize(event_loop());