Skip to content

Commit b86c373

Browse files
committed
adapter: Set upstream version in handshake messages
Upon new client connections, Readyset is setting the default database version string into the connection handshake packets. When we introduced the `LazyUpstream`, we made some functions not go upstream and just return a default value; `version()` is one of those functions. The reasoning behind the lazy connects is that when control plane/operational tools connect to Readyset for health check/diagnostic information, we were inadvertently opening a lot of connections to the upstream postgres itself, and thus naively consuming the limited connections that one can make to PG. Also, if a given client connection will never need to query the upstream anyways, why create that connection? This patch adds a new field to the `UpstreamSystemProperties` struct for `db_version`, and we pass that to the adapter `Backend`, via it's builder. `Backend::version()` will return that value if it is `Some`; else, it'll do it's normal behavior (which may be to return the default). Fixes: #1562 Release-Note-Core: Set the correct version of the upstream database new connection handshake for both mysql and postgres. Change-Id: Ib4cd2e85a4271c645e90eb9fd44befdfeff202df Reviewed-on: https://gerrit.readyset.name/c/readyset/+/10703 Tested-by: Buildkite CI Reviewed-by: Marcelo Altmann <marcelo@readyset.io>
1 parent 71d7bfc commit b86c373

File tree

4 files changed

+32
-10
lines changed

4 files changed

+32
-10
lines changed

readyset-adapter/src/backend.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ pub struct BackendBuilder {
314314
allow_cache_ddl: bool,
315315
sampler_tx:
316316
Option<tokio::sync::mpsc::Sender<(QueryExecutionEvent, String, Vec<SqlIdentifier>)>>,
317+
db_version: Option<String>,
317318
}
318319

319320
impl Default for BackendBuilder {
@@ -337,6 +338,7 @@ impl Default for BackendBuilder {
337338
connections: None,
338339
allow_cache_ddl: true,
339340
sampler_tx: None,
341+
db_version: None,
340342
}
341343
}
342344
}
@@ -422,6 +424,7 @@ impl BackendBuilder {
422424
is_internal_connection: false,
423425
shallow,
424426
shallow_refresh_sender,
427+
db_version: self.db_version,
425428
_query_handler: PhantomData,
426429
}
427430
}
@@ -525,6 +528,11 @@ impl BackendBuilder {
525528
self.sampler_tx = tx;
526529
self
527530
}
531+
532+
pub fn db_version(mut self, db_version: String) -> Self {
533+
self.db_version = Some(db_version);
534+
self
535+
}
528536
}
529537

530538
/// A [`PreparedStatement`] stores the data needed for an immediate execution of a prepared
@@ -652,6 +660,9 @@ where
652660
/// Sender for shallow refresh requests to worker pool
653661
shallow_refresh_sender: Option<async_channel::Sender<ShallowRefreshRequest>>,
654662

663+
/// Memoized upstream database version,
664+
db_version: Option<String>,
665+
655666
_query_handler: PhantomData<Handler>,
656667
}
657668

@@ -905,6 +916,10 @@ where
905916
Handler: 'static + QueryHandler,
906917
{
907918
pub fn version(&self) -> String {
919+
if let Some(version) = &self.db_version {
920+
return version.clone();
921+
}
922+
908923
self.upstream
909924
.as_ref()
910925
.map(|upstream| upstream.version())

readyset-client-test-helpers/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ impl TestBuilder {
383383
.lower_case_table_names()
384384
.await
385385
.unwrap(),
386+
db_version: cdc_upstream.version(),
386387
}
387388
} else {
388389
UpstreamSystemProperties {

readyset-data/src/upstream_system_props.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub struct UpstreamSystemProperties {
1010
pub timezone_name: SqlIdentifier,
1111
pub lower_case_database_names: bool,
1212
pub lower_case_table_names: bool,
13+
pub db_version: String,
1314
}
1415

1516
impl fmt::Display for UpstreamSystemProperties {

readyset/src/lib.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -713,12 +713,14 @@ where
713713
let timezone_name = upstream.timezone_name().await?;
714714
let lower_case_database_names = upstream.lower_case_database_names().await?;
715715
let lower_case_table_names = upstream.lower_case_table_names().await?;
716+
let db_version = upstream.version();
716717

717718
Ok(UpstreamSystemProperties {
718719
search_path,
719720
timezone_name,
720721
lower_case_database_names,
721722
lower_case_table_names,
723+
db_version,
722724
})
723725
};
724726

@@ -1408,16 +1410,19 @@ where
14081410
.instrument(debug_span!("Building noria connector"))
14091411
.await;
14101412

1411-
let backend = backend_builder.clone().build(
1412-
noria,
1413-
upstream,
1414-
Some(upstream_config.clone()),
1415-
query_status_cache,
1416-
adapter_authority.clone(),
1417-
status_reporter_clone,
1418-
adapter_start_time,
1419-
shallow,
1420-
);
1413+
let backend = backend_builder
1414+
.clone()
1415+
.db_version(sys_props.db_version.clone())
1416+
.build(
1417+
noria,
1418+
upstream,
1419+
Some(upstream_config.clone()),
1420+
query_status_cache,
1421+
adapter_authority.clone(),
1422+
status_reporter_clone,
1423+
adapter_start_time,
1424+
shallow,
1425+
);
14211426
connection_handler.process_connection(s, backend).await;
14221427
}
14231428
Err(error) => {

0 commit comments

Comments
 (0)