Skip to content

Commit 748d53f

Browse files
authored
Merge pull request #281 from muzarski/core-connections-per-host-shard
Connection pool size configuration
2 parents 71297f9 + c538885 commit 748d53f

File tree

6 files changed

+126
-9
lines changed

6 files changed

+126
-9
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
2727
:UseKeyspaceCaseSensitiveTests.*\
2828
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
2929
:MetricsTests.Integration_Cassandra_Requests\
30+
:MetricsTests.Integration_Cassandra_StatsShardConnections\
3031
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
3132
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\
3233
:ControlConnectionTests.Integration_Cassandra_TopologyChange\
@@ -68,6 +69,7 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
6869
:UseKeyspaceCaseSensitiveTests.*\
6970
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
7071
:MetricsTests.Integration_Cassandra_Requests\
72+
:MetricsTests.Integration_Cassandra_StatsShardConnections\
7173
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
7274
:PreparedTests.Integration_Cassandra_FailFastWhenPreparedIDChangesDuringReprepare\
7375
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\

include/cassandra.h

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1780,21 +1780,46 @@ cass_cluster_set_queue_size_event(CassCluster* cluster,
17801780
unsigned queue_size));
17811781

17821782
/**
1783-
* Sets the number of connections made to each server in each
1784-
* IO thread.
1783+
* Sets the number of connections opened by the driver to each host.
17851784
*
1786-
* <b>Default:</b> 1
1785+
* Notice that this overrides the number of connections per shard
1786+
* set by `cass_cluster_set_core_connections_per_shard()`.
1787+
*
1788+
* <b>Default:</b> 1 per shard (i.e. `cass_cluster_set_core_connections_per_shard(cluster, 1)`)
17871789
*
17881790
* @public @memberof CassCluster
17891791
*
17901792
* @param[in] cluster
17911793
* @param[in] num_connections
17921794
* @return CASS_OK if successful, otherwise an error occurred.
1795+
*
1796+
* @see cass_cluster_set_core_connections_per_shard()
17931797
*/
17941798
CASS_EXPORT CassError
17951799
cass_cluster_set_core_connections_per_host(CassCluster* cluster,
17961800
unsigned num_connections);
17971801

1802+
/**
1803+
* Sets the number of connections opened by the driver to each shard.
1804+
*
1805+
* Cassandra nodes are treated as if they have one shard.
1806+
*
1807+
* This will override the `cass_cluster_set_core_connections_per_host`, if set.
1808+
*
1809+
* <b>Default:</b> 1
1810+
*
1811+
* @public @memberof CassCluster
1812+
*
1813+
* @param[in] cluster
1814+
* @param[in] num_connections
1815+
* @return CASS_OK if successful, otherwise an error occurred.
1816+
*
1817+
* @see cass_cluster_set_core_connections_per_host()
1818+
*/
1819+
CASS_EXPORT CassError
1820+
cass_cluster_set_core_connections_per_shard(CassCluster* cluster,
1821+
unsigned num_connections);
1822+
17981823
/**
17991824
* Sets the maximum number of connections made to each server in each
18001825
* IO thread.

scylla-rust-wrapper/src/cluster.rs

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use openssl::ssl::SslContextBuilder;
1212
use openssl_sys::SSL_CTX_up_ref;
1313
use scylla::client::execution_profile::ExecutionProfileBuilder;
1414
use scylla::client::session_builder::SessionBuilder;
15-
use scylla::client::{SelfIdentity, WriteCoalescingDelay};
15+
use scylla::client::{PoolSize, SelfIdentity, WriteCoalescingDelay};
1616
use scylla::frame::Compression;
1717
use scylla::policies::load_balancing::{
1818
DefaultPolicyBuilder, LatencyAwarenessBuilder, LoadBalancingPolicy,
@@ -25,7 +25,7 @@ use std::collections::HashMap;
2525
use std::convert::TryInto;
2626
use std::future::Future;
2727
use std::net::IpAddr;
28-
use std::num::NonZero;
28+
use std::num::{NonZero, NonZeroUsize};
2929
use std::os::raw::{c_char, c_int, c_uint};
3030
use std::str::FromStr;
3131
use std::sync::Arc;
@@ -47,6 +47,8 @@ const DEFAULT_MAX_SCHEMA_WAIT_TIME: Duration = Duration::from_millis(10000);
4747
const DEFAULT_SCHEMA_AGREEMENT_INTERVAL: Duration = Duration::from_millis(200);
4848
// - setting TCP_NODELAY is true
4949
const DEFAULT_SET_TCP_NO_DELAY: bool = true;
50+
// - connection pool size is 1 per shard
51+
const DEFAULT_CONNECTION_POOL_SIZE: PoolSize = PoolSize::PerShard(NonZeroUsize::new(1).unwrap());
5052
// - enabling write coalescing
5153
const DEFAULT_ENABLE_WRITE_COALESCING: bool = true;
5254
// - write coalescing delay
@@ -234,6 +236,7 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr<CassCluster
234236
.schema_agreement_interval(DEFAULT_SCHEMA_AGREEMENT_INTERVAL)
235237
.tcp_nodelay(DEFAULT_SET_TCP_NO_DELAY)
236238
.connection_timeout(DEFAULT_CONNECT_TIMEOUT)
239+
.pool_size(DEFAULT_CONNECTION_POOL_SIZE)
237240
.write_coalescing(DEFAULT_ENABLE_WRITE_COALESCING)
238241
.write_coalescing_delay(DEFAULT_WRITE_COALESCING_DELAY)
239242
.keepalive_interval(DEFAULT_KEEPALIVE_INTERVAL)
@@ -454,6 +457,59 @@ pub unsafe extern "C" fn cass_cluster_set_connect_timeout(
454457
cluster.session_builder.config.connect_timeout = Duration::from_millis(timeout_ms.into());
455458
}
456459

460+
#[unsafe(no_mangle)]
461+
pub unsafe extern "C" fn cass_cluster_set_core_connections_per_host(
462+
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
463+
num_connections: c_uint,
464+
) -> CassError {
465+
let Some(cluster) = BoxFFI::as_mut_ref(cluster_raw) else {
466+
tracing::error!(
467+
"Provided null cluster pointer to cass_cluster_set_core_connections_per_host!"
468+
);
469+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
470+
};
471+
472+
match NonZeroUsize::new(num_connections as usize) {
473+
Some(non_zero_conns) => {
474+
cluster.session_builder.config.connection_pool_size = PoolSize::PerHost(non_zero_conns);
475+
CassError::CASS_OK
476+
}
477+
None => {
478+
tracing::error!(
479+
"Provided zero connections to cass_cluster_set_core_connections_per_host!"
480+
);
481+
CassError::CASS_ERROR_LIB_BAD_PARAMS
482+
}
483+
}
484+
}
485+
486+
#[unsafe(no_mangle)]
487+
pub unsafe extern "C" fn cass_cluster_set_core_connections_per_shard(
488+
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,
489+
num_connections: c_uint,
490+
) -> CassError {
491+
let Some(cluster) = BoxFFI::as_mut_ref(cluster_raw) else {
492+
tracing::error!(
493+
"Provided null cluster pointer to cass_cluster_set_core_connections_per_shard!"
494+
);
495+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
496+
};
497+
498+
match NonZeroUsize::new(num_connections as usize) {
499+
Some(non_zero_conns) => {
500+
cluster.session_builder.config.connection_pool_size =
501+
PoolSize::PerShard(non_zero_conns);
502+
CassError::CASS_OK
503+
}
504+
None => {
505+
tracing::error!(
506+
"Provided zero connections to cass_cluster_set_core_connections_per_shard!"
507+
);
508+
CassError::CASS_ERROR_LIB_BAD_PARAMS
509+
}
510+
}
511+
}
512+
457513
#[unsafe(no_mangle)]
458514
pub unsafe extern "C" fn cass_cluster_set_coalesce_delay(
459515
cluster_raw: CassBorrowedExclusivePtr<CassCluster, CMut>,

src/testing_unimplemented.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,6 @@ CASS_EXPORT CassError cass_cluster_set_cloud_secure_connection_bundle_no_ssl_lib
6565
CASS_EXPORT void cass_cluster_set_constant_reconnect(CassCluster* cluster, cass_uint64_t delay_ms) {
6666
throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_constant_reconnect\n");
6767
}
68-
CASS_EXPORT CassError cass_cluster_set_core_connections_per_host(CassCluster* cluster,
69-
unsigned num_connections) {
70-
throw std::runtime_error("UNIMPLEMENTED cass_cluster_set_core_connections_per_host\n");
71-
}
7268
CASS_EXPORT CassError cass_cluster_set_host_listener_callback(CassCluster* cluster,
7369
CassHostListenerCallback callback,
7470
void* data) {

tests/src/integration/objects/cluster.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,19 @@ class Cluster : public Object<CassCluster, cass_cluster_free> {
170170
return *this;
171171
}
172172

173+
/**
174+
* Assign the number of connections made to each shard
175+
*
176+
* NOTE: One extra connection is established (the control connection)
177+
*
178+
* @param connections Number of connection per shard (default: 1)
179+
* @return Cluster object
180+
*/
181+
Cluster& with_core_connections_per_shard(unsigned int connections = 1u) {
182+
EXPECT_EQ(CASS_OK, cass_cluster_set_core_connections_per_shard(get(), connections));
183+
return *this;
184+
}
185+
173186
/**
174187
* Sets credentials for plain text authentication
175188
*

tests/src/integration/tests/test_metrics.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,31 @@ CASSANDRA_INTEGRATION_TEST_F(MetricsTests, ErrorsConnectionTimeouts) {
7676
EXPECT_GE(2u, metrics.errors.connection_timeouts);
7777
}
7878

79+
/**
80+
* This test ensures that the driver is reporting the number of connections
81+
* when connection pool size is configured per shard.
82+
*/
83+
CASSANDRA_INTEGRATION_TEST_F(MetricsTests, StatsShardConnections) {
84+
CHECK_FAILURE;
85+
86+
const unsigned int CONNS_PER_SHARD = 2;
87+
88+
Session session =
89+
default_cluster().with_core_connections_per_shard(CONNS_PER_SHARD).connect();
90+
91+
size_t nr_hosts = explode(contact_points_, ',').size();
92+
size_t nr_shards = Options::is_scylla() ? Options::smp() : 1;
93+
size_t expected_connection_count = nr_hosts * nr_shards * CONNS_PER_SHARD;
94+
95+
CassMetrics metrics = session.metrics();
96+
for (int i = 0; i < 100 && metrics.stats.total_connections < expected_connection_count; ++i) {
97+
metrics = session.metrics();
98+
msleep(100);
99+
}
100+
101+
EXPECT_GE(metrics.stats.total_connections, expected_connection_count);
102+
}
103+
79104
/**
80105
* This test ensures that the driver is reporting the proper timeouts for requests
81106
*

0 commit comments

Comments
 (0)