diff --git a/include/cassandra.h b/include/cassandra.h index 49666cf20..664a11d56 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -2213,6 +2213,50 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, unsigned used_hosts_per_remote_dc, cass_bool_t allow_remote_dcs_for_local_cl); +/** + * Configures the cluster to use Rack-aware load balancing. + * For each query, all live nodes in a primary 'local' rack are tried first, + * followed by nodes from local DC and then nodes from other DCs. + * + * With empty local_rack and local_dc, default local_dc and local_rack + * is chosen from the first connected contact point, + * and no remote hosts are considered in query plans. + * If relying on this mechanism, be sure to use only contact + * points from the local rack. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] local_dc The primary data center to try first + * @param[in] local_rack The primary rack to try first + * @return CASS_OK if successful, otherwise an error occurred + */ +CASS_EXPORT CassError +cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, + const char* local_dc, + const char* local_rack); + + +/** + * Same as cass_cluster_set_load_balance_rack_aware(), but with lengths for string + * parameters. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] local_dc + * @param[in] local_dc_length + * @return same as cass_cluster_set_load_balance_dc_aware() + * + * @see cass_cluster_set_load_balance_dc_aware() + */ +CASS_EXPORT CassError +cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, + const char* local_dc, + size_t local_dc_length, + const char* local_rack, + size_t local_rack_length); + /** * Configures the cluster to use token-aware request routing or not. * diff --git a/src/cluster.cpp b/src/cluster.cpp index a46f3ffc1..21ab3eafb 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -18,6 +18,7 @@ #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "external.hpp" #include "logger.hpp" #include "resolver.hpp" @@ -240,6 +241,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list const ControlConnectionSchema& schema, const LoadBalancingPolicy::Ptr& load_balancing_policy, const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc, + const String& local_rack, const StringMultimap& supported_options, const ClusterSettings& settings) : connection_(connection) , listener_(listener ? listener : &nop_cluster_listener__) @@ -251,6 +253,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list , connected_host_(connected_host) , hosts_(hosts) , local_dc_(local_dc) + , local_rack_(local_rack) , supported_options_(supported_options) , is_recording_events_(settings.disable_events_on_startup) { static const auto optimized_msg = "===== Using optimized driver!!! =====\n"; diff --git a/src/cluster.hpp b/src/cluster.hpp index 4fe36ff5d..dec6043b6 100644 --- a/src/cluster.hpp +++ b/src/cluster.hpp @@ -257,6 +257,7 @@ class Cluster * determining the next control connection host. * @param load_balancing_policies * @param local_dc The local datacenter determined by the metadata service for initializing the + * @param local_rack The local rack determined by the metadata service for initializing the * load balancing policies. * @param supported_options Supported options discovered during control connection. * @param settings The control connection settings to use for reconnecting the @@ -267,6 +268,7 @@ class Cluster const ControlConnectionSchema& schema, const LoadBalancingPolicy::Ptr& load_balancing_policy, const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc, + const String& local_rack, const StringMultimap& supported_options, const ClusterSettings& settings); /** @@ -361,6 +363,7 @@ class Cluster const Host::Ptr& connected_host() const { return connected_host_; } const TokenMap::Ptr& token_map() const { return token_map_; } const String& local_dc() const { return local_dc_; } + const String& local_rack() const { return local_rack_; } const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); } const StringMultimap& supported_options() const { return supported_options_; } const ShardPortCalculator* shard_port_calculator() const { return shard_port_calculator_.get(); } @@ -449,6 +452,7 @@ class Cluster PreparedMetadata prepared_metadata_; TokenMap::Ptr token_map_; String local_dc_; + String local_rack_; StringMultimap supported_options_; Timer timer_; bool is_recording_events_; diff --git a/src/cluster_config.cpp b/src/cluster_config.cpp index ab357ed52..e536515b4 100644 --- a/src/cluster_config.cpp +++ b/src/cluster_config.cpp @@ -301,6 +301,27 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, const c return CASS_OK; } +CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc, + const char* local_rack) { + if (local_dc == NULL || local_rack == NULL) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + return cass_cluster_set_load_balance_rack_aware_n(cluster, local_dc, SAFE_STRLEN(local_dc), + local_rack, SAFE_STRLEN(local_rack)); +} + +CassError cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const char* local_dc, + size_t local_dc_length, + const char* local_rack, + size_t local_rack_length) { + if (local_dc == NULL || local_dc_length == 0 || local_rack == NULL || local_rack_length == 0) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + cluster->config().set_load_balancing_policy(new RackAwarePolicy( + String(local_dc, local_dc_length), String(local_rack, local_rack_length))); + return CASS_OK; +} + void cass_cluster_set_token_aware_routing(CassCluster* cluster, cass_bool_t enabled) { cluster->config().set_token_aware_routing(enabled == cass_true); } diff --git a/src/cluster_connector.cpp b/src/cluster_connector.cpp index e0415e151..eb959e17f 100644 --- a/src/cluster_connector.cpp +++ b/src/cluster_connector.cpp @@ -16,6 +16,7 @@ #include "cluster_connector.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "protocol.hpp" #include "random.hpp" #include "round_robin_policy.hpp" @@ -177,6 +178,7 @@ void ClusterConnector::on_resolve(ClusterMetadataResolver* resolver) { } local_dc_ = resolver->local_dc(); + local_rack_ = resolver->local_rack(); remaining_connector_count_ = resolved_contact_points.size(); for (AddressVec::const_iterator it = resolved_contact_points.begin(), end = resolved_contact_points.end(); @@ -231,7 +233,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) { for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end(); it != end; ++it) { LoadBalancingPolicy::Ptr policy(*it); - policy->init(connected_host, hosts, random_, local_dc_); + policy->init(connected_host, hosts, random_, local_dc_, local_rack_); policy->register_handles(event_loop_->loop()); } @@ -248,6 +250,11 @@ void ClusterConnector::on_connect(ControlConnector* connector) { message = "No hosts available for the control connection using the " "DC-aware load balancing policy. " "Check to see if the configured local datacenter is valid"; + } else if (dynamic_cast(query_plan.get()) != + NULL) { // Check if Rack-aware + message = "No hosts available for the control connection using the " + "Rack-aware load balancing policy. " + "Check to see if the configured local datacenter and rack is valid"; } else { message = "No hosts available for the control connection using the " "configured load balancing policy"; @@ -258,7 +265,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) { cluster_.reset(new Cluster(connector->release_connection(), listener_, event_loop_, connected_host, hosts, connector->schema(), default_policy, policies, - local_dc_, connector->supported_options(), settings_)); + local_dc_, local_rack_, connector->supported_options(), settings_)); // Clear any connection errors and set the final negotiated protocol version. error_code_ = CLUSTER_OK; diff --git a/src/cluster_connector.hpp b/src/cluster_connector.hpp index e960fa058..8c8572322 100644 --- a/src/cluster_connector.hpp +++ b/src/cluster_connector.hpp @@ -169,6 +169,7 @@ class ClusterConnector : public RefCounted { Random* random_; Metrics* metrics_; String local_dc_; + String local_rack_; ClusterSettings settings_; Callback callback_; diff --git a/src/cluster_metadata_resolver.hpp b/src/cluster_metadata_resolver.hpp index 90e91acbd..bcca03bf3 100644 --- a/src/cluster_metadata_resolver.hpp +++ b/src/cluster_metadata_resolver.hpp @@ -48,6 +48,7 @@ class ClusterMetadataResolver : public RefCounted { const AddressVec& resolved_contact_points() const { return resolved_contact_points_; } const String& local_dc() const { return local_dc_; } + const String& local_rack() const { return local_rack_; } protected: virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) = 0; @@ -57,6 +58,7 @@ class ClusterMetadataResolver : public RefCounted { protected: AddressVec resolved_contact_points_; String local_dc_; + String local_rack_; Callback callback_; }; diff --git a/src/dc_aware_policy.cpp b/src/dc_aware_policy.cpp index d68bec1a2..5c1550227 100644 --- a/src/dc_aware_policy.cpp +++ b/src/dc_aware_policy.cpp @@ -43,7 +43,7 @@ DCAwarePolicy::DCAwarePolicy(const String& local_dc, size_t used_hosts_per_remot DCAwarePolicy::~DCAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); } void DCAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { if (local_dc_.empty()) { // Only override if no local DC was specified. local_dc_ = local_dc; } diff --git a/src/dc_aware_policy.hpp b/src/dc_aware_policy.hpp index f76b7307b..526338c29 100644 --- a/src/dc_aware_policy.hpp +++ b/src/dc_aware_policy.hpp @@ -37,7 +37,7 @@ class DCAwarePolicy : public LoadBalancingPolicy { ~DCAwarePolicy(); virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/execution_profile.hpp b/src/execution_profile.hpp index 2b5645149..cb4d34f61 100644 --- a/src/execution_profile.hpp +++ b/src/execution_profile.hpp @@ -23,6 +23,7 @@ #include "cassandra.h" #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "dense_hash_map.hpp" #include "latency_aware_policy.hpp" #include "speculative_execution.hpp" diff --git a/src/latency_aware_policy.cpp b/src/latency_aware_policy.cpp index 9f77a384f..29541bbb8 100644 --- a/src/latency_aware_policy.cpp +++ b/src/latency_aware_policy.cpp @@ -27,13 +27,13 @@ using namespace datastax::internal; using namespace datastax::internal::core; void LatencyAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { hosts_->reserve(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost()); for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { i->second->enable_latency_tracking(settings_.scale_ns, settings_.min_measured); } - ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack); } void LatencyAwarePolicy::register_handles(uv_loop_t* loop) { start_timer(loop); } diff --git a/src/latency_aware_policy.hpp b/src/latency_aware_policy.hpp index 178752a4a..c04430c1a 100644 --- a/src/latency_aware_policy.hpp +++ b/src/latency_aware_policy.hpp @@ -51,7 +51,7 @@ class LatencyAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~LatencyAwarePolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual void register_handles(uv_loop_t* loop); virtual void close_handles(); diff --git a/src/list_policy.cpp b/src/list_policy.cpp index 7dc9357d4..fa38c838e 100644 --- a/src/list_policy.cpp +++ b/src/list_policy.cpp @@ -23,7 +23,7 @@ using namespace datastax::internal; using namespace datastax::internal::core; void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { HostMap valid_hosts; for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { const Host::Ptr& host = i->second; @@ -36,7 +36,7 @@ void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Ran LOG_ERROR("No valid hosts available for list policy"); } - ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc, local_rack); } CassHostDistance ListPolicy::distance(const Host::Ptr& host) const { diff --git a/src/list_policy.hpp b/src/list_policy.hpp index bda75f5f5..99b13eb22 100644 --- a/src/list_policy.hpp +++ b/src/list_policy.hpp @@ -31,7 +31,7 @@ class ListPolicy : public ChainedLoadBalancingPolicy { virtual ~ListPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/load_balancing.hpp b/src/load_balancing.hpp index ba60928a9..367860126 100644 --- a/src/load_balancing.hpp +++ b/src/load_balancing.hpp @@ -40,9 +40,21 @@ typedef enum CassBalancingState_ { CASS_BALANCING_NEW_QUERY_PLAN } CassBalancingState; +// CassHostDistance specifies how far a host is from the client. +// The meaning of the distance depends on the load balancing policy. +// The policies should assign the distance starting from the lowest +// without skipping values, i.e. they should start with LOCAL. +// For example: +// - DCAwarePolicy uses LOCAL for same DC, REMOTE for different DC. +// - RackAwarePolicy uses LOCAL for same rack, +// REMOTE for different rack and same DC, and +// REMOTE2 for different DC. +// - RoundRobinPolicy has distinguishes only one distance level and +// always uses LOCAL for all nodes. typedef enum CassHostDistance_ { CASS_HOST_DISTANCE_LOCAL, CASS_HOST_DISTANCE_REMOTE, + CASS_HOST_DISTANCE_REMOTE2, CASS_HOST_DISTANCE_IGNORE } CassHostDistance; @@ -87,7 +99,7 @@ class LoadBalancingPolicy : public RefCounted { virtual ~LoadBalancingPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) = 0; + const String& local_dc, const String &local_rack) = 0; virtual void register_handles(uv_loop_t* loop) {} virtual void close_handles() {} @@ -124,8 +136,8 @@ class ChainedLoadBalancingPolicy : public LoadBalancingPolicy { virtual ~ChainedLoadBalancingPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { - return child_policy_->init(connected_host, hosts, random, local_dc); + const String& local_dc, const String& local_rack) { + return child_policy_->init(connected_host, hosts, random, local_dc, local_rack); } virtual const LoadBalancingPolicy::Ptr& child_policy() const { return child_policy_; } diff --git a/src/rack_aware_policy.cpp b/src/rack_aware_policy.cpp new file mode 100644 index 000000000..394af5b19 --- /dev/null +++ b/src/rack_aware_policy.cpp @@ -0,0 +1,310 @@ +/* + Copyright (c) DataStax, Inc. + Copyright (c) 2022 Kiwi.com + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "rack_aware_policy.hpp" + +#include "logger.hpp" +#include "request_handler.hpp" +#include "scoped_lock.hpp" + +#include + +using namespace datastax; +using namespace datastax::internal; +using namespace datastax::internal::core; + +RackAwarePolicy::RackAwarePolicy(const String& local_dc, const String& local_rack) + : local_dc_(local_dc) + , local_rack_(local_rack) + , local_rack_live_hosts_(new HostVec()) + , index_(0) { + uv_rwlock_init(&available_rwlock_); +} + +RackAwarePolicy::~RackAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); } + +void RackAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, + const String& local_dc, const String& local_rack) { + if (local_dc_.empty()) { // Only override if no local DC was specified. + local_dc_ = local_dc; + } + + if (local_dc_.empty() && connected_host && !connected_host->dc().empty()) { + LOG_INFO("Using '%s' for the local data center " + "(if this is incorrect, please provide the correct data center)", + connected_host->dc().c_str()); + local_dc_ = connected_host->dc(); + } + + if (local_rack_.empty()) { // Only override if no local rack was specified. + local_rack_ = local_rack; + } + + if (local_rack_.empty() && connected_host && !connected_host->rack().empty()) { + LOG_INFO("Using '%s' for the local rack " + "(if this is incorrect, please provide the correct rack)", + connected_host->rack().c_str()); + local_rack_ = connected_host->rack(); + } + + available_.resize(hosts.size()); + std::transform(hosts.begin(), hosts.end(), std::inserter(available_, available_.begin()), + GetAddress()); + + for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { + on_host_added(i->second); + } + if (random != NULL) { + index_ = random->next(std::max(static_cast(1), hosts.size())); + } +} + +CassHostDistance RackAwarePolicy::distance(const Host::Ptr& host) const { + if (local_dc_.empty() || local_rack_.empty() || (host->dc() == local_dc_ && host->rack() == local_rack_)) { + return CASS_HOST_DISTANCE_LOCAL; + } + + if (host->dc() == local_dc_) { + const CopyOnWriteHostVec& hosts = per_remote_rack_live_hosts_.get_hosts(host->rack()); + size_t num_hosts = hosts->size(); + for (size_t i = 0; i < num_hosts; ++i) { + if ((*hosts)[i]->address() == host->address()) { + return CASS_HOST_DISTANCE_REMOTE; + } + } + } + + const CopyOnWriteHostVec& hosts = per_remote_dc_live_hosts_.get_hosts(host->dc()); + size_t num_hosts = hosts->size(); + for (size_t i = 0; i < num_hosts; ++i) { + if ((*hosts)[i]->address() == host->address()) { + return CASS_HOST_DISTANCE_REMOTE2; + } + } + + return CASS_HOST_DISTANCE_IGNORE; +} + +QueryPlan* RackAwarePolicy::new_query_plan(const String& keyspace, RequestHandler* request_handler, + const TokenMap* token_map) { + CassConsistency cl = + request_handler != NULL ? request_handler->consistency() : CASS_DEFAULT_CONSISTENCY; + return new RackAwareQueryPlan(this, cl, index_++); +} + +bool RackAwarePolicy::is_host_up(const Address& address) const { + ScopedReadLock rl(&available_rwlock_); + return available_.count(address) > 0; +} + +void RackAwarePolicy::on_host_added(const Host::Ptr& host) { + const String& dc = host->dc(); + const String& rack = host->rack(); + if (local_dc_.empty() && !dc.empty()) { + LOG_INFO("Using '%s' for local data center " + "(if this is incorrect, please provide the correct data center)", + host->dc().c_str()); + local_dc_ = dc; + } + if (local_rack_.empty() && !rack.empty()) { + LOG_INFO("Using '%s' for local data center " + "(if this is incorrect, please provide the correct data center)", + host->rack().c_str()); + local_rack_ = rack; + } + + if (dc == local_dc_ && rack == local_rack_) { + add_host(local_rack_live_hosts_, host); + } else if (dc == local_dc_) { + per_remote_rack_live_hosts_.add_host_to_key(rack, host); + } else { + per_remote_dc_live_hosts_.add_host_to_key(dc, host); + } +} + +void RackAwarePolicy::on_host_removed(const Host::Ptr& host) { + const String& dc = host->dc(); + const String& rack = host->rack(); + if (dc == local_dc_ && rack == local_rack_) { + remove_host(local_rack_live_hosts_, host); + } else if (dc == local_dc_) { + per_remote_rack_live_hosts_.remove_host_from_key(host->rack(), host); + } else { + per_remote_dc_live_hosts_.remove_host_from_key(host->dc(), host); + } + + ScopedWriteLock wl(&available_rwlock_); + available_.erase(host->address()); +} + +void RackAwarePolicy::on_host_up(const Host::Ptr& host) { + on_host_added(host); + + ScopedWriteLock wl(&available_rwlock_); + available_.insert(host->address()); +} + +void RackAwarePolicy::on_host_down(const Address& address) { + if (!remove_host(local_rack_live_hosts_, address) && + !per_remote_rack_live_hosts_.remove_host(address) && + !per_remote_dc_live_hosts_.remove_host(address)) { + LOG_DEBUG("Attempted to mark host %s as DOWN, but it doesn't exist", + address.to_string().c_str()); + } + + ScopedWriteLock wl(&available_rwlock_); + available_.erase(address); +} + +const String& RackAwarePolicy::local_dc() const { + ScopedReadLock rl(&available_rwlock_); + return local_dc_; +} + +const String& RackAwarePolicy::local_rack() const { + ScopedReadLock rl(&available_rwlock_); + return local_rack_; +} + +void RackAwarePolicy::PerKeyHostMap::add_host_to_key(const String& key, const Host::Ptr& host) { + ScopedWriteLock wl(&rwlock_); + Map::iterator i = map_.find(key); + if (i == map_.end()) { + CopyOnWriteHostVec hosts(new HostVec()); + hosts->push_back(host); + map_.insert(Map::value_type(key, hosts)); + } else { + add_host(i->second, host); + } +} + +void RackAwarePolicy::PerKeyHostMap::remove_host_from_key(const String& key, const Host::Ptr& host) { + ScopedWriteLock wl(&rwlock_); + Map::iterator i = map_.find(key); + if (i != map_.end()) { + core::remove_host(i->second, host); + } +} + +bool RackAwarePolicy::PerKeyHostMap::remove_host(const Address& address) { + ScopedWriteLock wl(&rwlock_); + for (Map::iterator i = map_.begin(), end = map_.end(); i != end; ++i) { + if (core::remove_host(i->second, address)) { + return true; + } + } + return false; +} + +const CopyOnWriteHostVec& RackAwarePolicy::PerKeyHostMap::get_hosts(const String& dc) const { + ScopedReadLock rl(&rwlock_); + Map::const_iterator i = map_.find(dc); + if (i == map_.end()) return no_hosts_; + + return i->second; +} + +void RackAwarePolicy::PerKeyHostMap::copy_keys(KeySet* keys) const { + ScopedReadLock rl(&rwlock_); + for (Map::const_iterator i = map_.begin(), end = map_.end(); i != end; ++i) { + keys->insert(i->first); + } +} + +// Helper functions to prevent copy (Notice: "const CopyOnWriteHostVec&") + +static const Host::Ptr& get_next_host(const CopyOnWriteHostVec& hosts, size_t index) { + return (*hosts)[index % hosts->size()]; +} + +static size_t get_hosts_size(const CopyOnWriteHostVec& hosts) { return hosts->size(); } + +RackAwarePolicy::RackAwareQueryPlan::RackAwareQueryPlan(const RackAwarePolicy* policy, CassConsistency cl, + size_t start_index) + : policy_(policy) + , cl_(cl) + , hosts_(policy_->local_rack_live_hosts_) + , local_remaining_(get_hosts_size(hosts_)) + , remote_remaining_(0) + , index_(start_index) {} + +Host::Ptr RackAwarePolicy::RackAwareQueryPlan::compute_next() { + while (local_remaining_ > 0) { + --local_remaining_; + const Host::Ptr& host(get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (!remote_racks_) { + remote_racks_.reset(new PerKeyHostMap::KeySet()); + policy_->per_remote_rack_live_hosts_.copy_keys(remote_racks_.get()); + } + + while (true) { + while (remote_remaining_ > 0) { + --remote_remaining_; + const Host::Ptr& host( + get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (remote_racks_->empty()) { + break; + } + + PerKeyHostMap::KeySet::iterator i = remote_racks_->begin(); + hosts_ = policy_->per_remote_rack_live_hosts_.get_hosts(*i); + remote_remaining_ = get_hosts_size(hosts_); + remote_racks_->erase(i); + } + + // Skip remote DCs for LOCAL_ consistency levels. + if (is_dc_local(cl_)) { + return Host::Ptr(); + } + + if (!remote_dcs_) { + remote_dcs_.reset(new PerKeyHostMap::KeySet()); + policy_->per_remote_dc_live_hosts_.copy_keys(remote_dcs_.get()); + } + + while (true) { + while (remote_remaining_ > 0) { + --remote_remaining_; + const Host::Ptr& host( + get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (remote_dcs_->empty()) { + break; + } + + PerKeyHostMap::KeySet::iterator i = remote_dcs_->begin(); + hosts_ = policy_->per_remote_dc_live_hosts_.get_hosts(*i); + remote_remaining_ = get_hosts_size(hosts_); + remote_dcs_->erase(i); + } + + return Host::Ptr(); +} diff --git a/src/rack_aware_policy.hpp b/src/rack_aware_policy.hpp new file mode 100644 index 000000000..33a8cbbef --- /dev/null +++ b/src/rack_aware_policy.hpp @@ -0,0 +1,128 @@ +/* + Copyright (c) DataStax, Inc. + Copyright (c) 2022 Kiwi.com + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef DATASTAX_INTERNAL_RACK_AWARE_POLICY_HPP +#define DATASTAX_INTERNAL_RACK_AWARE_POLICY_HPP + +#include "host.hpp" +#include "load_balancing.hpp" +#include "map.hpp" +#include "round_robin_policy.hpp" +#include "scoped_lock.hpp" +#include "scoped_ptr.hpp" +#include "set.hpp" + +#include + +namespace datastax { namespace internal { namespace core { + +class RackAwarePolicy : public LoadBalancingPolicy { +public: + RackAwarePolicy(const String& local_dc = "", const String &local_rack = ""); + + ~RackAwarePolicy(); + + virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, + const String& local_dc, const String& local_rack); + + virtual CassHostDistance distance(const Host::Ptr& host) const; + + virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler, + const TokenMap* token_map); + + virtual bool is_host_up(const Address& address) const; + + virtual void on_host_added(const Host::Ptr& host); + virtual void on_host_removed(const Host::Ptr& host); + virtual void on_host_up(const Host::Ptr& host); + virtual void on_host_down(const Address& address); + + virtual const String& local_dc() const; + virtual const String& local_rack() const; + + virtual LoadBalancingPolicy* new_instance() { + return new RackAwarePolicy(local_dc_, local_rack_); + } + +private: + class PerKeyHostMap { + public: + typedef internal::Map Map; + typedef Set KeySet; + + PerKeyHostMap() + : no_hosts_(new HostVec()) { + uv_rwlock_init(&rwlock_); + } + ~PerKeyHostMap() { uv_rwlock_destroy(&rwlock_); } + + void add_host_to_key(const String& key, const Host::Ptr& host); + void remove_host_from_key(const String& key, const Host::Ptr& host); + bool remove_host(const Address& address); + const CopyOnWriteHostVec& get_hosts(const String& key) const; + void copy_keys(KeySet* keys) const; + + private: + Map map_; + mutable uv_rwlock_t rwlock_; + const CopyOnWriteHostVec no_hosts_; + + private: + DISALLOW_COPY_AND_ASSIGN(PerKeyHostMap); + }; + + const CopyOnWriteHostVec& get_local_dc_hosts() const; + void get_remote_dcs(PerKeyHostMap::KeySet* remote_dcs) const; + +public: + class RackAwareQueryPlan : public QueryPlan { + public: + RackAwareQueryPlan(const RackAwarePolicy* policy, CassConsistency cl, size_t start_index); + + virtual Host::Ptr compute_next(); + + private: + const RackAwarePolicy* policy_; + CassConsistency cl_; + CopyOnWriteHostVec hosts_; + ScopedPtr remote_racks_; + ScopedPtr remote_dcs_; + size_t local_remaining_; + size_t remote_remaining_; + size_t index_; + }; + +private: + mutable uv_rwlock_t available_rwlock_; + AddressSet available_; + + String local_dc_; + String local_rack_; + + CopyOnWriteHostVec local_rack_live_hosts_; + // remote rack, local dc + PerKeyHostMap per_remote_rack_live_hosts_; + PerKeyHostMap per_remote_dc_live_hosts_; + size_t index_; + +private: + DISALLOW_COPY_AND_ASSIGN(RackAwarePolicy); +}; + +}}} // namespace datastax::internal::core + +#endif diff --git a/src/request_processor.cpp b/src/request_processor.cpp index d02ad9e7f..bda526df4 100644 --- a/src/request_processor.cpp +++ b/src/request_processor.cpp @@ -170,7 +170,7 @@ RequestProcessor::RequestProcessor(RequestProcessorListener* listener, EventLoop const Host::Ptr& connected_host, const HostMap& hosts, const TokenMap::Ptr& token_map, const RequestProcessorSettings& settings, Random* random, - const String& local_dc) + const String& local_dc, const String& local_rack) : connection_pool_manager_(connection_pool_manager) , listener_(listener ? listener : &nop_request_processor_listener__) , event_loop_(event_loop) @@ -213,7 +213,7 @@ RequestProcessor::RequestProcessor(RequestProcessorListener* listener, EventLoop LoadBalancingPolicy::Vec policies = load_balancing_policies(); for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(); it != policies.end(); ++it) { // Initialize the load balancing policies - (*it)->init(connected_host, hosts, random, local_dc); + (*it)->init(connected_host, hosts, random, local_dc, local_rack); (*it)->register_handles(event_loop_->loop()); } diff --git a/src/request_processor.hpp b/src/request_processor.hpp index 67b0bf47c..253ac4828 100644 --- a/src/request_processor.hpp +++ b/src/request_processor.hpp @@ -166,12 +166,13 @@ class RequestProcessor * @param settings The current settings for the request processor. * @param random A RNG for randomizing hosts in the load balancing policies. * @param local_dc The local datacenter for initializing the load balancing policies. + * @param local_rack The local rack for initializing the load balancing policies. */ RequestProcessor(RequestProcessorListener* listener, EventLoop* event_loop, const ConnectionPoolManager::Ptr& connection_pool_manager, const Host::Ptr& connected_host, const HostMap& hosts, const TokenMap::Ptr& token_map, const RequestProcessorSettings& settings, - Random* random, const String& local_dc); + Random* random, const String& local_dc, const String& local_rack); /** * Close/Terminate the request request processor (thread-safe). diff --git a/src/request_processor_initializer.cpp b/src/request_processor_initializer.cpp index 6705b72e9..24d1a6d48 100644 --- a/src/request_processor_initializer.cpp +++ b/src/request_processor_initializer.cpp @@ -40,7 +40,7 @@ class RunInitializeProcessor : public Task { RequestProcessorInitializer::RequestProcessorInitializer( const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, - const TokenMap::Ptr& token_map, const String& local_dc, const Callback& callback) + const TokenMap::Ptr& token_map, const String& local_dc, const String& local_rack, const Callback& callback) : event_loop_(NULL) , listener_(NULL) , metrics_(NULL) @@ -51,6 +51,7 @@ RequestProcessorInitializer::RequestProcessorInitializer( , hosts_(hosts) , token_map_(token_map) , local_dc_(local_dc) + , local_rack_(local_rack) , error_code_(REQUEST_PROCESSOR_OK) , callback_(callback) { uv_mutex_init(&mutex_); @@ -166,7 +167,7 @@ void RequestProcessorInitializer::on_initialize(ConnectionPoolManagerInitializer } else { processor_.reset(new RequestProcessor(listener_, event_loop_, initializer->release_manager(), connected_host_, hosts_, token_map_, settings_, random_, - local_dc_)); + local_dc_, local_rack_)); int rc = processor_->init(RequestProcessor::Protected()); if (rc != 0) { diff --git a/src/request_processor_initializer.hpp b/src/request_processor_initializer.hpp index 8d63380d9..b685e5dd2 100644 --- a/src/request_processor_initializer.hpp +++ b/src/request_processor_initializer.hpp @@ -60,12 +60,13 @@ class RequestProcessorInitializer * @param hosts A mapping of available hosts in the cluster. * @param token_map A token map. * @param local_dc The local datacenter for initializing the load balancing policies. + * @param local_rack The local datacenter for initializing the load balancing policies. * @param callback A callback that is called when the processor is initialized * or if an error occurred. */ RequestProcessorInitializer(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc, const Callback& callback); + const String& local_dc, const String& local_rack, const Callback& callback); ~RequestProcessorInitializer(); /** @@ -176,6 +177,7 @@ class RequestProcessorInitializer HostMap hosts_; const TokenMap::Ptr token_map_; String local_dc_; + String local_rack_; RequestProcessorError error_code_; String error_message_; diff --git a/src/round_robin_policy.cpp b/src/round_robin_policy.cpp index dd7f2ecff..4c8ac62d7 100644 --- a/src/round_robin_policy.cpp +++ b/src/round_robin_policy.cpp @@ -33,7 +33,7 @@ RoundRobinPolicy::RoundRobinPolicy() RoundRobinPolicy::~RoundRobinPolicy() { uv_rwlock_destroy(&available_rwlock_); } void RoundRobinPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { available_.resize(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::inserter(available_, available_.begin()), GetAddress()); diff --git a/src/round_robin_policy.hpp b/src/round_robin_policy.hpp index f5b4f715d..aebc62deb 100644 --- a/src/round_robin_policy.hpp +++ b/src/round_robin_policy.hpp @@ -31,7 +31,7 @@ class RoundRobinPolicy : public LoadBalancingPolicy { ~RoundRobinPolicy(); virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/session.cpp b/src/session.cpp index 34de94c13..9d725da46 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -195,13 +195,14 @@ class SessionInitializer : public RefCounted { SessionInitializer() { uv_mutex_destroy(&mutex_); } void initialize(const Host::Ptr& connected_host, ProtocolVersion protocol_version, - const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc) { + const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc, + const String& local_rack) { inc_ref(); const size_t thread_count_io = remaining_ = session_->config().thread_count_io(); for (size_t i = 0; i < thread_count_io; ++i) { RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - connected_host, protocol_version, hosts, token_map, local_dc, + connected_host, protocol_version, hosts, token_map, local_dc, local_rack, bind_callback(&SessionInitializer::on_initialize, this))); RequestProcessorSettings settings(session_->config()); @@ -360,7 +361,7 @@ void Session::join() { void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc) { + const String& local_dc, const String& local_rack) { int rc = 0; if (hosts.empty()) { @@ -394,7 +395,7 @@ void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protoc request_processor_count_ = 0; is_closing_ = false; SessionInitializer::Ptr initializer(new SessionInitializer(this)); - initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc); + initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc, local_rack); } void Session::on_close() { diff --git a/src/session.hpp b/src/session.hpp index 856833114..45efc5e89 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -54,7 +54,7 @@ class Session virtual void on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual void on_close(); diff --git a/src/session_base.cpp b/src/session_base.cpp index 6ccb4f7c5..c1e7b6333 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -160,7 +160,7 @@ void SessionBase::notify_closed() { void SessionBase::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc) { + const String& local_dc, const String& local_rack) { notify_connected(); } @@ -200,7 +200,7 @@ void SessionBase::on_initialize(ClusterConnector* connector) { } on_connect(cluster_->connected_host(), cluster_->protocol_version(), - cluster_->available_hosts(), cluster_->token_map(), cluster_->local_dc()); + cluster_->available_hosts(), cluster_->token_map(), cluster_->local_dc(), cluster_->local_rack()); } else { assert(!connector->is_canceled() && "Cluster connection process canceled"); switch (connector->error_code()) { diff --git a/src/session_base.hpp b/src/session_base.hpp index 1c3e6c68f..b0c3c7c16 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -117,7 +117,7 @@ class SessionBase : public ClusterListener { */ virtual void on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc); + const String& local_dc, const String& local_rack); /** * A callback called after the control connection fails to connect. By default diff --git a/src/token_aware_policy.cpp b/src/token_aware_policy.cpp index cc2cd8394..adb7a5f68 100644 --- a/src/token_aware_policy.cpp +++ b/src/token_aware_policy.cpp @@ -37,7 +37,7 @@ static inline bool contains(const CopyOnWriteHostVec& replicas, const Address& a } void TokenAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { if (random != NULL) { if (shuffle_replicas_) { // Store random so that it can be used to shuffle replicas. @@ -48,7 +48,7 @@ void TokenAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& host index_ = random->next(std::max(static_cast(1), hosts.size())); } } - ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack); } QueryPlan* TokenAwarePolicy::new_query_plan(const String& keyspace, RequestHandler* request_handler, @@ -87,8 +87,8 @@ QueryPlan* TokenAwarePolicy::new_query_plan(const String& keyspace, RequestHandl } Host::Ptr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { - while (remaining_ > 0) { - --remaining_; + while (remaining_local_ > 0) { + --remaining_local_; const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); if (child_policy_->is_host_up(host->address()) && child_policy_->distance(host) == CASS_HOST_DISTANCE_LOCAL) { @@ -96,10 +96,28 @@ Host::Ptr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { } } + while (remaining_remote_ > 0) { + --remaining_remote_; + const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); + if (child_policy_->is_host_up(host->address()) && + child_policy_->distance(host) == CASS_HOST_DISTANCE_REMOTE) { + return host; + } + } + + while (remaining_remote2_ > 0) { + --remaining_remote2_; + const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); + if (child_policy_->is_host_up(host->address()) && + child_policy_->distance(host) == CASS_HOST_DISTANCE_REMOTE2) { + return host; + } + } + Host::Ptr host; while ((host = child_plan_->compute_next())) { if (!contains(replicas_, host->address()) || - child_policy_->distance(host) != CASS_HOST_DISTANCE_LOCAL) { + child_policy_->distance(host) > CASS_HOST_DISTANCE_REMOTE2) { return host; } } diff --git a/src/token_aware_policy.hpp b/src/token_aware_policy.hpp index 5a8cee903..637f041cf 100644 --- a/src/token_aware_policy.hpp +++ b/src/token_aware_policy.hpp @@ -35,7 +35,7 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~TokenAwarePolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler, const TokenMap* token_map); @@ -53,7 +53,9 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { , child_plan_(child_plan) , replicas_(replicas) , index_(start_index) - , remaining_(replicas->size()) {} + , remaining_local_(replicas->size()) + , remaining_remote_(replicas->size()) + , remaining_remote2_(replicas->size()) {} Host::Ptr compute_next(); @@ -62,7 +64,9 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { ScopedPtr child_plan_; CopyOnWriteHostVec replicas_; size_t index_; - size_t remaining_; + size_t remaining_local_; + size_t remaining_remote_; + size_t remaining_remote2_; }; Random* random_; diff --git a/tests/src/unit/tests/test_load_balancing.cpp b/tests/src/unit/tests/test_load_balancing.cpp index 0cb10da7f..c55ea990a 100644 --- a/tests/src/unit/tests/test_load_balancing.cpp +++ b/tests/src/unit/tests/test_load_balancing.cpp @@ -1,5 +1,6 @@ /* Copyright (c) DataStax, Inc. + Copyright (c) 2023 Kiwi.com Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,6 +22,7 @@ #include "blacklist_policy.hpp" #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "event_loop.hpp" #include "latency_aware_policy.hpp" #include "murmur3.hpp" @@ -46,6 +48,9 @@ const String LOCAL_DC = "local"; const String REMOTE_DC = "remote"; const String BACKUP_DC = "backup"; +const String LOCAL_RACK = "local"; +const String REMOTE_RACK = "remote"; + #define VECTOR_FROM(t, a) Vector(a, a + sizeof(a) / sizeof(a[0])) Address addr_for_sequence(size_t i) { @@ -72,20 +77,33 @@ void populate_hosts(size_t count, const String& rack, const String& dc, HostMap* } void verify_sequence(QueryPlan* qp, const Vector& sequence) { + Vector
actual; Address received; + size_t limit = 100; + while (qp->compute_next(&received) && limit > 0) { + limit--; + actual.emplace_back(received); + } + if (limit == 0) { + FAIL() << "Iteration limit exceeded"; + } + Vector
expected; for (Vector::const_iterator it = sequence.begin(); it != sequence.end(); ++it) { - ASSERT_TRUE(qp->compute_next(&received)); - EXPECT_EQ(addr_for_sequence(*it), received); + expected.emplace_back(addr_for_sequence(*it)); } - EXPECT_FALSE(qp->compute_next(&received)); + EXPECT_EQ(expected, actual); } typedef Map QueryCounts; -QueryCounts run_policy(LoadBalancingPolicy& policy, int count) { +QueryCounts run_policy(LoadBalancingPolicy& policy, int count, CassConsistency consistency) { QueryCounts counts; - for (int i = 0; i < 12; ++i) { - ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + for (int i = 0; i < count; ++i) { + QueryRequest::Ptr request(new QueryRequest("", 0)); + request->set_consistency(consistency); + SharedRefPtr request_handler( + new RequestHandler(request, ResponseFuture::Ptr())); + ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); Host::Ptr host(qp->compute_next()); if (host) { counts[host->address()] += 1; @@ -94,6 +112,7 @@ QueryCounts run_policy(LoadBalancingPolicy& policy, int count) { return counts; } +// verify_dcs checks that all hosts in counts are from expected_dc. void verify_dcs(const QueryCounts& counts, const HostMap& hosts, const String& expected_dc) { for (QueryCounts::const_iterator it = counts.begin(), end = counts.end(); it != end; ++it) { HostMap::const_iterator host_it = hosts.find(it->first); @@ -102,6 +121,18 @@ void verify_dcs(const QueryCounts& counts, const HostMap& hosts, const String& e } } +// verify_racks checks that all hosts in counts are from expected_dc and expected_rack. +void verify_racks(const QueryCounts& counts, const HostMap& hosts, const String& expected_dc, + const String& expected_rack) { + for (QueryCounts::const_iterator it = counts.begin(), end = counts.end(); it != end; ++it) { + HostMap::const_iterator host_it = hosts.find(it->first); + ASSERT_NE(host_it, hosts.end()); + EXPECT_EQ(expected_dc, host_it->second->dc()); + EXPECT_EQ(expected_rack, host_it->second->rack()); + } +} + +// verify_query_counts checks that each host was used expected_count times. void verify_query_counts(const QueryCounts& counts, int expected_count) { for (QueryCounts::const_iterator it = counts.begin(), end = counts.end(); it != end; ++it) { EXPECT_EQ(expected_count, it->second); @@ -169,7 +200,7 @@ void test_dc_aware_policy(size_t local_count, size_t remote_count) { populate_hosts(local_count, "rack", LOCAL_DC, &hosts); populate_hosts(remote_count, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy(LOCAL_DC, remote_count, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); const size_t total_hosts = local_count + remote_count; @@ -185,7 +216,7 @@ TEST(RoundRobinLoadBalancingUnitTest, Simple) { populate_hosts(2, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // start on first elem ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -207,7 +238,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnAdd) { populate_hosts(2, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // baseline ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -230,7 +261,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnRemove) { populate_hosts(3, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); SharedRefPtr host = hosts.begin()->second; @@ -251,7 +282,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnUpAndDown) { populate_hosts(3, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp_before1(policy.new_query_plan("ks", NULL, NULL)); ScopedPtr qp_before2(policy.new_query_plan("ks", NULL, NULL)); @@ -297,10 +328,10 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) { populate_hosts(3, "rack", "dc", &hosts); RoundRobinPolicy policy; - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); { // All nodes - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); } @@ -308,7 +339,7 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) { policy.on_host_down(hosts.begin()->first); { // One node down - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); } @@ -316,7 +347,7 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) { policy.on_host_up(hosts.begin()->second); { // All nodes again - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); } @@ -324,7 +355,7 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) { policy.on_host_removed(hosts.begin()->second); { // One node removed - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); } @@ -338,7 +369,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, SomeDatacenterLocalUnspecified) { h->set_rack_and_dc("", ""); DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -353,7 +384,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, SingleLocalDown) { populate_hosts(1, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp_before( policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan @@ -380,7 +411,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllLocalRemovedReturned) { populate_hosts(1, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp_before( policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan @@ -412,7 +443,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, RemoteRemovedReturned) { SharedRefPtr target_host = hosts[target_addr]; DCAwarePolicy policy(LOCAL_DC, 1, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp_before( policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan @@ -443,7 +474,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, UsedHostsPerDatacenter) { for (size_t used_hosts = 0; used_hosts < 4; ++used_hosts) { DCAwarePolicy policy(LOCAL_DC, used_hosts, false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); Vector seq; @@ -476,7 +507,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllowRemoteDatacentersForLocalConsist // Not allowing remote DCs for local CLs bool allow_remote_dcs_for_local_cl = false; DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // Set local CL QueryRequest::Ptr request(new QueryRequest("", 0)); @@ -494,7 +525,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllowRemoteDatacentersForLocalConsist // Allowing remote DCs for local CLs bool allow_remote_dcs_for_local_cl = true; DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // Set local CL QueryRequest::Ptr request(new QueryRequest("", 0)); @@ -517,7 +548,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, StartWithEmptyLocalDatacenter) { // Set local DC using connected host { DCAwarePolicy policy("", 0, false); - policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL, ""); + policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); const size_t seq[] = { 2, 3, 4 }; @@ -527,7 +558,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, StartWithEmptyLocalDatacenter) { // Set local DC using first host with non-empty DC { DCAwarePolicy policy("", 0, false); - policy.init(SharedRefPtr(new Host(Address("0.0.0.0", 9042))), hosts, NULL, ""); + policy.init(SharedRefPtr(new Host(Address("0.0.0.0", 9042))), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); const size_t seq[] = { 1 }; @@ -547,10 +578,10 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { populate_hosts(3, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy("", 0, false); - policy.init(hosts.begin()->second, hosts, NULL, ""); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); { // All local nodes - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, LOCAL_DC); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); @@ -559,7 +590,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { policy.on_host_down(hosts.begin()->first); { // One local node down - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, LOCAL_DC); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); @@ -568,7 +599,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { policy.on_host_up(hosts.begin()->second); { // All local nodes again - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, LOCAL_DC); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); @@ -577,7 +608,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { policy.on_host_removed(hosts.begin()->second); { // One local node removed - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, LOCAL_DC); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); @@ -590,7 +621,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { populate_hosts(3, "rack", REMOTE_DC, &hosts); DCAwarePolicy policy("", 3, false); // Allow all remote DC nodes - policy.init(hosts.begin()->second, hosts, NULL, ""); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); Host::Ptr remote_dc_node1; { // Mark down all local nodes @@ -603,16 +634,365 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { } { // All remote nodes - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, REMOTE_DC); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_down(remote_dc_node1->address()); + + { // One remote node down + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, REMOTE_DC); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } + + policy.on_host_up(remote_dc_node1); + + { // All remote nodes again + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, REMOTE_DC); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); } + policy.on_host_removed(remote_dc_node1); + + { // One remote node removed + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, REMOTE_DC); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } +} + +// Check that host with unspecified rack and DC is last. +TEST(RackAwareLoadBalancingUnitTest, SomeDatacenterRackLocalUnspecified) { + const size_t total_hosts = 3; + HostMap hosts; + populate_hosts(total_hosts, LOCAL_RACK, LOCAL_DC, &hosts); + Host* h = hosts.begin()->second.get(); + h->set_rack_and_dc("", ""); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + // Use CL=ONE to allow remote DCs. + QueryRequest::Ptr request(new QueryRequest("", 0)); + request->set_consistency(CASS_CONSISTENCY_ONE); + SharedRefPtr request_handler( + new RequestHandler(request, ResponseFuture::Ptr())); + + ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); + + const size_t seq[] = { 2, 3, 1 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); +} + +// Check that host with unspecified rack is last. +TEST(RackAwareLoadBalancingUnitTest, SomeRackLocalUnspecified) { + const size_t total_hosts = 3; + HostMap hosts; + populate_hosts(total_hosts, LOCAL_RACK, LOCAL_DC, &hosts); + Host* h = hosts.begin()->second.get(); + h->set_rack_and_dc("", LOCAL_DC); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + // Use CL=ONE to allow remote DCs. + QueryRequest::Ptr request(new QueryRequest("", 0)); + request->set_consistency(CASS_CONSISTENCY_ONE); + SharedRefPtr request_handler( + new RequestHandler(request, ResponseFuture::Ptr())); + + ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); + + const size_t seq[] = { 2, 3, 1 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); +} + +// Check that down host is not returned. +TEST(RackAwareLoadBalancingUnitTest, SingleLocalDown) { + HostMap hosts; + populate_hosts(3, LOCAL_RACK, LOCAL_DC, &hosts); + SharedRefPtr target_host = hosts.begin()->second; + populate_hosts(1, REMOTE_RACK, LOCAL_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + ScopedPtr qp_before( + policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan + ScopedPtr qp_after( + policy.new_query_plan("ks", NULL, NULL)); // should not have down host ptr in plan + + policy.on_host_down(target_host->address()); + { + const size_t seq[] = { 2, 3, 4 }; + verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq)); + } + + policy.on_host_up(target_host); + { + const size_t seq[] = { 2, 3, 1, 4 }; // local dc wrapped before remote offered + verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, AllLocalRemovedReturned) { + HostMap hosts; + populate_hosts(1, LOCAL_RACK, LOCAL_DC, &hosts); + SharedRefPtr target_host = hosts.begin()->second; + populate_hosts(1, REMOTE_RACK, LOCAL_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + ScopedPtr qp_before( + policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan + policy.on_host_down(target_host->address()); + ScopedPtr qp_after( + policy.new_query_plan("ks", NULL, NULL)); // should not have down host ptr in plan + + { + const size_t seq[] = { 2 }; + verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq)); + verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq)); + } + + policy.on_host_up(target_host); + + // make sure we get the local node first after on_up + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + { + const size_t seq[] = { 1, 2 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, RemoteRemovedReturned) { + HostMap hosts; + populate_hosts(1, LOCAL_RACK, LOCAL_DC, &hosts); + populate_hosts(1, REMOTE_RACK, LOCAL_DC, &hosts); + Address target_addr("2.0.0.0", 9042); + SharedRefPtr target_host = hosts[target_addr]; + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + ScopedPtr qp_before( + policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan + policy.on_host_down(target_host->address()); + ScopedPtr qp_after( + policy.new_query_plan("ks", NULL, NULL)); // should not have down host ptr in plan + + { + const size_t seq[] = { 1 }; + verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq)); + verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq)); + } + + policy.on_host_up(target_host); + + // make sure we get both nodes, correct order after + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + { + const size_t seq[] = { 1, 2 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, DoNotAllowRemoteDatacentersForLocalConsistencyLevel) { + HostMap hosts; + populate_hosts(3, "rack", LOCAL_DC, &hosts); + populate_hosts(3, "rack", REMOTE_DC, &hosts); + + // Not allowing remote DCs for local CLs + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); + + // Set local CL + QueryRequest::Ptr request(new QueryRequest("", 0)); + request->set_consistency(CASS_CONSISTENCY_LOCAL_ONE); + SharedRefPtr request_handler( + new RequestHandler(request, ResponseFuture::Ptr())); + + // Check for only local hosts are used + ScopedPtr qp(policy.new_query_plan("ks", request_handler.get(), NULL)); + const size_t seq[] = { 1, 2, 3 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); +} + +TEST(RackAwareLoadBalancingUnitTest, StartWithEmptyLocalRack) { + HostMap hosts; + populate_hosts(1, REMOTE_RACK, LOCAL_DC, &hosts); + populate_hosts(3, LOCAL_RACK, LOCAL_DC, &hosts); + + // Set local rack using connected host + { + RackAwarePolicy policy("", ""); + policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL, "", ""); + + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + const size_t seq[] = { 2, 3, 4, 1 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } + + // Set local rack using first host with non-empty rack + { + RackAwarePolicy policy("", ""); + policy.init(SharedRefPtr(new Host(Address("0.0.0.0", 9042))), hosts, NULL, "", ""); + + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + const size_t seq[] = { 1, 3, 4, 2 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, StartWithEmptyLocalDatacenter) { + HostMap hosts; + populate_hosts(1, "rack", REMOTE_DC, &hosts); + populate_hosts(3, "rack", LOCAL_DC, &hosts); + + // Set local DC using connected host + { + RackAwarePolicy policy("", ""); + policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL, "", ""); + + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + const size_t seq[] = { 2, 3, 4 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } + + // Set local DC using first host with non-empty DC + { + RackAwarePolicy policy("", ""); + policy.init(SharedRefPtr(new Host(Address("0.0.0.0", 9042))), hosts, NULL, "", ""); + + ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); + const size_t seq[] = { 1 }; + verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); + } +} + +TEST(RackAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalRack) { + HostMap hosts; + populate_hosts(3, LOCAL_RACK, LOCAL_DC, &hosts); + populate_hosts(3, REMOTE_RACK, LOCAL_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); + + { // All local nodes + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_racks(counts, hosts, LOCAL_DC, LOCAL_RACK); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_down(hosts.begin()->first); + + { // One local node down + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_racks(counts, hosts, LOCAL_DC, LOCAL_RACK); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } + + policy.on_host_up(hosts.begin()->second); + + { // All local nodes again + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_racks(counts, hosts, LOCAL_DC, LOCAL_RACK); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_removed(hosts.begin()->second); + + { // One local node removed + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_racks(counts, hosts, LOCAL_DC, LOCAL_RACK); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } +} + +TEST(RackAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) { + HostMap hosts; + populate_hosts(3, "rack", LOCAL_DC, &hosts); + populate_hosts(3, "rack", REMOTE_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); + + { // All local nodes + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, LOCAL_DC); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_down(hosts.begin()->first); + + { // One local node down + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, LOCAL_DC); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } + + policy.on_host_up(hosts.begin()->second); + + { // All local nodes again + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, LOCAL_DC); + ASSERT_EQ(counts.size(), 3u); + verify_query_counts(counts, 4); + } + + policy.on_host_removed(hosts.begin()->second); + + { // One local node removed + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, LOCAL_DC); + ASSERT_EQ(counts.size(), 2u); + verify_query_counts(counts, 6); + } +} + +TEST(RackAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { + HostMap hosts; + populate_hosts(3, LOCAL_RACK, LOCAL_DC, &hosts); + populate_hosts(3, LOCAL_RACK, REMOTE_DC, &hosts); + + RackAwarePolicy policy(LOCAL_DC, LOCAL_RACK); + policy.init(hosts.begin()->second, hosts, NULL, "", ""); + + Host::Ptr remote_dc_node1; + { // Mark down all local nodes + HostMap::iterator it = hosts.begin(); + for (int i = 0; i < 3; ++i) { + policy.on_host_down(it->first); + it++; + } + remote_dc_node1 = it->second; + } + + { // All remote nodes + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); + verify_dcs(counts, hosts, REMOTE_DC); + ASSERT_EQ(counts.size(), 3u) << "Should use all hosts from remote DC"; + verify_query_counts(counts, 4); + } + policy.on_host_down(remote_dc_node1->address()); { // One remote node down - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, REMOTE_DC); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); @@ -621,7 +1001,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { policy.on_host_up(remote_dc_node1); { // All remote nodes again - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, REMOTE_DC); ASSERT_EQ(counts.size(), 3u); verify_query_counts(counts, 4); @@ -630,7 +1010,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) { policy.on_host_removed(remote_dc_node1); { // One remote node removed - QueryCounts counts(run_policy(policy, 12)); + QueryCounts counts(run_policy(policy, 12, CASS_CONSISTENCY_ONE)); verify_dcs(counts, hosts, REMOTE_DC); ASSERT_EQ(counts.size(), 2u); verify_query_counts(counts, 6); @@ -664,7 +1044,7 @@ TEST(TokenAwareLoadBalancingUnitTest, Simple) { token_map->build(); TokenAwarePolicy policy(new RoundRobinPolicy(), false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); QueryRequest::Ptr request(new QueryRequest("", 1)); const char* value = "kjdfjkldsdjkl"; // hash: 9024137376112061887 @@ -710,12 +1090,12 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { // Tokens // 1.0.0.0 local -6588122883467697006 - // 2.0.0.0 remote -3952873730080618204 - // 3.0.0.0 local -1317624576693539402 - // 4.0.0.0 remote 1317624576693539400 - // 5.0.0.0 local 3952873730080618202 + // 2.0.0.0 remote -3952873730080618204 (replica for -5434086359492102041) + // 3.0.0.0 local -1317624576693539402 (replica for -5434086359492102041) + // 4.0.0.0 remote 1317624576693539400 (replica for -5434086359492102041) + // 5.0.0.0 local 3952873730080618202 (replica for -5434086359492102041) // 6.0.0.0 remote 6588122883467697004 - // 7.0.0.0 local 9223372036854775806 + // 7.0.0.0 local 9223372036854775806 (replica for -5434086359492102041) const uint64_t partition_size = CASS_UINT64_MAX / num_hosts; Murmur3Partitioner::Token token = CASS_INT64_MIN + static_cast(partition_size); @@ -737,7 +1117,7 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { token_map->build(); TokenAwarePolicy policy(new DCAwarePolicy(LOCAL_DC, num_hosts / 2, false), false); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); QueryRequest::Ptr request(new QueryRequest("", 1)); const char* value = "abc"; // hash: -5434086359492102041 @@ -747,7 +1127,7 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { { ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); - const size_t seq[] = { 3, 5, 7, 1, 4, 6, 2 }; + const size_t seq[] = { 3, 5, 7, 2, 4, 1, 6 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -757,7 +1137,7 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { { ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); - const size_t seq[] = { 3, 5, 7, 4, 6, 2 }; + const size_t seq[] = { 3, 5, 7, 2, 4, 6 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } @@ -769,7 +1149,7 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) { { ScopedPtr qp(policy.new_query_plan("test", request_handler.get(), token_map.get())); - const size_t seq[] = { 5, 7, 1, 6, 2, 4 }; + const size_t seq[] = { 5, 7, 2, 4, 1, 6 }; verify_sequence(qp.get(), VECTOR_FROM(size_t, seq)); } } @@ -811,7 +1191,7 @@ TEST(TokenAwareLoadBalancingUnitTest, ShuffleReplicas) { HostVec not_shuffled; { TokenAwarePolicy policy(new RoundRobinPolicy(), false); // Not shuffled - policy.init(SharedRefPtr(), hosts, &random, ""); + policy.init(SharedRefPtr(), hosts, &random, "", ""); ScopedPtr qp1(policy.new_query_plan("test", request_handler.get(), token_map.get())); for (int i = 0; i < num_hosts; ++i) { not_shuffled.push_back(qp1->compute_next()); @@ -829,7 +1209,7 @@ TEST(TokenAwareLoadBalancingUnitTest, ShuffleReplicas) { // Verify that the shuffle setting does indeed shuffle the replicas { TokenAwarePolicy shuffle_policy(new RoundRobinPolicy(), true); // Shuffled - shuffle_policy.init(SharedRefPtr(), hosts, &random, ""); + shuffle_policy.init(SharedRefPtr(), hosts, &random, "", ""); HostVec shuffled_previous; ScopedPtr qp( @@ -927,7 +1307,7 @@ TEST(LatencyAwareLoadBalancingUnitTest, Simple) { HostMap hosts; populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts); LatencyAwarePolicy policy(new RoundRobinPolicy(), settings); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); // Record some latencies with 100 ns being the minimum for (HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) { @@ -989,7 +1369,7 @@ TEST(LatencyAwareLoadBalancingUnitTest, MinAverageUnderMinMeasured) { HostMap hosts; populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts); LatencyAwarePolicy policy(new RoundRobinPolicy(), settings); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); int count = 1; for (HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) { @@ -1023,7 +1403,7 @@ TEST(WhitelistLoadBalancingUnitTest, Hosts) { whitelist_hosts.push_back("37.0.0.0"); whitelist_hosts.push_back("83.0.0.0"); WhitelistPolicy policy(new RoundRobinPolicy(), whitelist_hosts); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -1044,7 +1424,7 @@ TEST(WhitelistLoadBalancingUnitTest, Datacenters) { whitelist_dcs.push_back(LOCAL_DC); whitelist_dcs.push_back(REMOTE_DC); WhitelistDCPolicy policy(new RoundRobinPolicy(), whitelist_dcs); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -1064,7 +1444,7 @@ TEST(BlacklistLoadBalancingUnitTest, Hosts) { blacklist_hosts.push_back("2.0.0.0"); blacklist_hosts.push_back("3.0.0.0"); BlacklistPolicy policy(new RoundRobinPolicy(), blacklist_hosts); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); @@ -1085,7 +1465,7 @@ TEST(BlacklistLoadBalancingUnitTest, Datacenters) { blacklist_dcs.push_back(LOCAL_DC); blacklist_dcs.push_back(REMOTE_DC); BlacklistDCPolicy policy(new RoundRobinPolicy(), blacklist_dcs); - policy.init(SharedRefPtr(), hosts, NULL, ""); + policy.init(SharedRefPtr(), hosts, NULL, "", ""); ScopedPtr qp(policy.new_query_plan("ks", NULL, NULL)); diff --git a/tests/src/unit/tests/test_request_processor.cpp b/tests/src/unit/tests/test_request_processor.cpp index d1900397d..69b13b423 100644 --- a/tests/src/unit/tests/test_request_processor.cpp +++ b/tests/src/unit/tests/test_request_processor.cpp @@ -37,7 +37,7 @@ class InorderLoadBalancingPolicy : public LoadBalancingPolicy { , hosts_(new HostVec()) {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { hosts_->reserve(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost()); } @@ -277,7 +277,7 @@ TEST_F(RequestProcessorUnitTest, Simple) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->initialize(event_loop()); @@ -296,7 +296,7 @@ TEST_F(RequestProcessorUnitTest, CloseWithRequestsPending) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->initialize(event_loop()); @@ -334,7 +334,7 @@ TEST_F(RequestProcessorUnitTest, Auth) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -359,7 +359,7 @@ TEST_F(RequestProcessorUnitTest, Ssl) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->with_settings(settings)->initialize(event_loop()); @@ -383,7 +383,7 @@ TEST_F(RequestProcessorUnitTest, NotifyAddRemoveHost) { Future::Ptr up_future(new Future()); Future::Ptr down_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -415,7 +415,7 @@ TEST_F(RequestProcessorUnitTest, CloseDuringReconnect) { Future::Ptr close_future(new Future()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -450,7 +450,7 @@ TEST_F(RequestProcessorUnitTest, CloseDuringAddNewHost) { Future::Ptr close_future(new Future()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); CloseListener::Ptr listener(new CloseListener(close_future)); @@ -480,7 +480,7 @@ TEST_F(RequestProcessorUnitTest, PoolDown) { Future::Ptr up_future(new Future()); Future::Ptr down_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); UpDownListener::Ptr listener(new UpDownListener(up_future, down_future, target_host)); @@ -510,7 +510,7 @@ TEST_F(RequestProcessorUnitTest, PoolUp) { Future::Ptr up_future(new Future()); Future::Ptr down_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -538,7 +538,7 @@ TEST_F(RequestProcessorUnitTest, InvalidAuth) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -564,7 +564,7 @@ TEST_F(RequestProcessorUnitTest, InvalidSsl) { Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); SslContext::Ptr ssl_context(SslContextFactory::create()); // No trusted cert @@ -600,7 +600,7 @@ TEST_F(RequestProcessorUnitTest, RollingRestart) { HostMap hosts(generate_hosts()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); RequestProcessorSettings settings; @@ -632,7 +632,7 @@ TEST_F(RequestProcessorUnitTest, NoHostsAvailable) { HostMap hosts(generate_hosts()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->with_listener(listener.get())->initialize(event_loop()); @@ -668,7 +668,7 @@ TEST_F(RequestProcessorUnitTest, RequestTimeout) { HostMap hosts(generate_hosts()); Future::Ptr connect_future(new Future()); RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->with_listener(listener.get())->initialize(event_loop()); @@ -718,7 +718,7 @@ TEST_F(RequestProcessorUnitTest, LowNumberOfStreams) { settings.request_queue_size = 2 * CASS_MAX_STREAMS + 1; // Create a request queue with enough room RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", + hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "", "", bind_callback(on_connected, connect_future.get()))); initializer->with_settings(settings)->with_listener(listener.get())->initialize(event_loop());