Skip to content

Commit dfbd9f6

Browse files
committed
cluster: attempt recreate control conn immediately
Until now, if all nodes changed their IPs at once, we would discover them only after the next metadata fetch is issued, which might happen only after 60 seconds (if previous fetch succeeded). This commit introduces immediate signalling that the control connection got broken, so that ClusterWorker begins instantly its every-1-second-attempt phase. In manual tests, this showed to be very robust: immediately after losing control connection, the reconnect-attempt-phase begins. As soon as any node (one known initially or from recently fetched metadata) becomes reachable, a control connection is opened and metadata is fetched successfully, so the whole cluster is discoverable.
1 parent eca000b commit dfbd9f6

File tree

4 files changed

+63
-4
lines changed

4 files changed

+63
-4
lines changed

scylla/src/transport/cluster.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ struct ClusterWorker {
110110
// Channel used to receive server events
111111
server_events_channel: tokio::sync::mpsc::Receiver<Event>,
112112

113+
// Channel used to receive signals that control connection is broken
114+
control_connection_repair_channel: tokio::sync::broadcast::Receiver<()>,
115+
113116
// Keyspace send in "USE <keyspace name>" when opening each connection
114117
used_keyspace: Option<VerifiedKeyspaceName>,
115118

@@ -141,9 +144,12 @@ impl Cluster {
141144
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
142145
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
143146
let (server_events_sender, server_events_receiver) = tokio::sync::mpsc::channel(32);
147+
let (control_connection_repair_sender, control_connection_repair_receiver) =
148+
tokio::sync::broadcast::channel(32);
144149

145150
let mut metadata_reader = MetadataReader::new(
146151
known_nodes,
152+
control_connection_repair_sender,
147153
initial_peers,
148154
pool_config.connection_config.clone(),
149155
pool_config.keepalive_interval,
@@ -174,6 +180,7 @@ impl Cluster {
174180

175181
refresh_channel: refresh_receiver,
176182
server_events_channel: server_events_receiver,
183+
control_connection_repair_channel: control_connection_repair_receiver,
177184

178185
use_keyspace_channel: use_keyspace_receiver,
179186
used_keyspace: None,
@@ -542,6 +549,25 @@ impl ClusterWorker {
542549

543550
continue; // Don't go to refreshing, wait for the next event
544551
}
552+
recv_res = self.control_connection_repair_channel.recv() => {
553+
match recv_res {
554+
Ok(()) => {
555+
// The control connection was broken. Acknowledge that and start attempting to reconnect.
556+
// The first reconnect attempt will be immediate (by attempting metadata refresh below),
557+
// and if it does not succeed, then `control_connection_works` will be set to `false`,
558+
// so subsequent attempts will be issued every second.
559+
}
560+
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
561+
// This is very unlikely; we would have to have a lot of concurrent
562+
// control connections opened and broken at the same time.
563+
// The best we can do is ignoring this.
564+
}
565+
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
566+
// If control_connection_repair_channel was closed then MetadataReader was dropped,
567+
// we can stop working.
568+
}
569+
}
570+
}
545571
}
546572

547573
// Perform the refresh

scylla/src/transport/connection_pool.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ use std::num::NonZeroUsize;
2525
use std::pin::Pin;
2626
use std::sync::{Arc, RwLock, Weak};
2727
use std::time::Duration;
28-
use tokio::sync::{mpsc, Notify};
28+
29+
use tokio::sync::{broadcast, mpsc, Notify};
2930
use tracing::instrument::WithSubscriber;
3031
use tracing::{debug, trace, warn};
3132

@@ -169,6 +170,7 @@ impl NodeConnectionPool {
169170
endpoint: UntranslatedEndpoint,
170171
#[allow(unused_mut)] mut pool_config: PoolConfig, // `mut` needed only with "cloud" feature
171172
current_keyspace: Option<VerifiedKeyspaceName>,
173+
refresh_requester: Option<broadcast::Sender<()>>,
172174
) -> Self {
173175
let (use_keyspace_request_sender, use_keyspace_request_receiver) = mpsc::channel(1);
174176
let pool_updated_notify = Arc::new(Notify::new());
@@ -205,6 +207,7 @@ impl NodeConnectionPool {
205207
pool_config,
206208
current_keyspace,
207209
pool_updated_notify.clone(),
210+
refresh_requester,
208211
);
209212

210213
let conns = refiller.get_shared_connections();
@@ -472,6 +475,12 @@ struct PoolRefiller {
472475

473476
// Signaled when the connection pool is updated
474477
pool_updated_notify: Arc<Notify>,
478+
479+
// Enables requesting metadata refresh from ClusterWorker.
480+
// This is useful when a control connection breaks: an empty pool of the control connection
481+
// immediately triggers reestablishing the control connection. Without this, reestablishment
482+
// would occur only after metadata refresh timeout (60s by default) elapses.
483+
control_connection_repair_requester: Option<broadcast::Sender<()>>,
475484
}
476485

477486
#[derive(Debug)]
@@ -486,6 +495,7 @@ impl PoolRefiller {
486495
pool_config: PoolConfig,
487496
current_keyspace: Option<VerifiedKeyspaceName>,
488497
pool_updated_notify: Arc<Notify>,
498+
control_connection_repair_requester: Option<broadcast::Sender<()>>,
489499
) -> Self {
490500
// At the beginning, we assume the node does not have any shards
491501
// and assume that the node is a Cassandra node
@@ -513,6 +523,7 @@ impl PoolRefiller {
513523
current_keyspace,
514524

515525
pool_updated_notify,
526+
control_connection_repair_requester,
516527
}
517528
}
518529

@@ -1037,6 +1048,13 @@ impl PoolRefiller {
10371048
self.conns[shard_id].len(),
10381049
self.active_connection_count(),
10391050
);
1051+
if !self.has_connections() {
1052+
// The last connection got closed, so if this is a control connection,
1053+
// then request a metadata refresh to trigger the control connection reestablishment.
1054+
if let Some(requester) = self.control_connection_repair_requester.as_ref() {
1055+
let _ = requester.send(());
1056+
}
1057+
}
10401058
self.update_shared_conns(Some(last_error));
10411059
return;
10421060
}

scylla/src/transport/node.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,12 @@ impl Node {
104104
let datacenter = peer.datacenter.clone();
105105
let rack = peer.rack.clone();
106106
let pool = enabled.then(|| {
107-
NodeConnectionPool::new(UntranslatedEndpoint::Peer(peer), pool_config, keyspace_name)
107+
NodeConnectionPool::new(
108+
UntranslatedEndpoint::Peer(peer),
109+
pool_config,
110+
keyspace_name,
111+
None,
112+
)
108113
});
109114

110115
Node {

scylla/src/transport/topology.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::str::FromStr;
2727
use std::sync::Arc;
2828
use std::time::{Duration, Instant};
2929
use strum_macros::EnumString;
30-
use tokio::sync::mpsc;
30+
use tokio::sync::{broadcast, mpsc};
3131
use tracing::{debug, error, trace, warn};
3232
use uuid::Uuid;
3333

@@ -50,6 +50,10 @@ pub(crate) struct MetadataReader {
5050
// When no known peer is reachable, initial known nodes are resolved once again as a fallback
5151
// and establishing control connection to them is attempted.
5252
initial_known_nodes: Vec<KnownNode>,
53+
54+
// When a control connection breaks, the PoolRefiller of its pool uses the requester
55+
// to signal ClusterWorker that an immediate metadata refresh is advisable.
56+
control_connection_repair_requester: broadcast::Sender<()>,
5357
}
5458

5559
/// Describes all metadata retrieved from the cluster
@@ -386,6 +390,7 @@ impl MetadataReader {
386390
#[allow(clippy::too_many_arguments)]
387391
pub fn new(
388392
initial_known_nodes: Vec<KnownNode>,
393+
control_connection_repair_requester: broadcast::Sender<()>,
389394
initially_known_peers: Vec<ResolvedContactPoint>,
390395
mut connection_config: ConnectionConfig,
391396
keepalive_interval: Option<Duration>,
@@ -410,6 +415,7 @@ impl MetadataReader {
410415
control_connection_endpoint.clone(),
411416
connection_config.clone(),
412417
keepalive_interval,
418+
control_connection_repair_requester.clone(),
413419
);
414420

415421
MetadataReader {
@@ -425,6 +431,7 @@ impl MetadataReader {
425431
fetch_schema,
426432
host_filter: host_filter.clone(),
427433
initial_known_nodes,
434+
control_connection_repair_requester,
428435
}
429436
}
430437

@@ -530,6 +537,7 @@ impl MetadataReader {
530537
self.control_connection_endpoint.clone(),
531538
self.connection_config.clone(),
532539
self.keepalive_interval,
540+
self.control_connection_repair_requester.clone(),
533541
);
534542

535543
debug!(
@@ -629,6 +637,7 @@ impl MetadataReader {
629637
self.control_connection_endpoint.clone(),
630638
self.connection_config.clone(),
631639
self.keepalive_interval,
640+
self.control_connection_repair_requester.clone(),
632641
);
633642
}
634643
}
@@ -639,6 +648,7 @@ impl MetadataReader {
639648
endpoint: UntranslatedEndpoint,
640649
connection_config: ConnectionConfig,
641650
keepalive_interval: Option<Duration>,
651+
refresh_requester: broadcast::Sender<()>,
642652
) -> NodeConnectionPool {
643653
let pool_config = PoolConfig {
644654
connection_config,
@@ -652,7 +662,7 @@ impl MetadataReader {
652662
can_use_shard_aware_port: false,
653663
};
654664

655-
NodeConnectionPool::new(endpoint, pool_config, None)
665+
NodeConnectionPool::new(endpoint, pool_config, None, Some(refresh_requester))
656666
}
657667
}
658668

0 commit comments

Comments
 (0)