Skip to content

Commit 08336cf

Browse files
readyset-adapter: Add Query Sampler
This commit adds a Query Sampler thread. The sampler thread is responsible for probabilistically sampling queries that were originally sent to Readyset. The intention is to pro-actively validate the correctness of the Readyset Server. The sampler thread will run queries against the upstream database and compare the normalized results with the results returned by the Readyset Server. Queries are passed from adapter to sampler via a bounded channel, which will drop queries if the channel is full. The sampler thread has two main queues: - A queue of queries that have been sent from the adapter. Those are randomly sampled. - A retry queue. In case of a mismatch, we add queries to a second queue to be retried. Re-try logic is implemented by a new struct that keeps track of some metadata about the query, such as last upstream result hash, the number or retry attempts, and the query itself. The retry queue is a double ended queue, since we need to be able to add elements in the end of the queue and pop elements in the front. Elements are added in retry order. The retry is based on a timestamp calculated from the current time + the retry delay. The retry queue is bounded, and will drop entries if it is full. We added the following metrics: - `readyset_query_sampler_queue_len`: The number of queries in the sampler queue. - `readyset_query_sampler_queries_sampled`: The number of queries sampled. - `readyset_query_sampler_queries_mismatched`: The number of queries that were sampled but returned different results. - `readyset_query_sampler_retry_queue_len`: The number of entries in the retry queue. - `readyset_query_sampler_retry_queue_full`: The number of times the retry queue is full. - `readyset_query_sampler_max_qps_hit`: The number of times the sampler rate limiter is hit. - `readyset_query_sampler_reconnects`: The number of times the sampler reconnects to the upstream database. We also added new configuration options: - `SAMPLER_SAMPLE_RATE`: The probability of sampling a query. - `SAMPLER_RETRY_DELAY_MS`: The delay between retries. - `SAMPLER_QUEUE_CAPACITY`: The maximum number of queries in the sampler queue. - `SAMPLER_MAX_RETRIES`: The maximum number of retries for a query. - `SAMPLER_MAX_QUERIES_PER_SECOND`: The maximum number of queries to sample per second to avoid overwhelming the upstream database. Note: The current implementation is limited to Text Protocol queries. A further CL will add support for Binary Protocol queries. Fixes REA-5328. Release-Note-Core: Added a query sampler thread to the Readyset Adapter. This thread will sample queries and compare the results with the results returned by the upstream database and report possible mismatches. Change-Id: I95f8f8b15be977e30998b59f0f266cb685a9e99b Reviewed-on: https://gerrit.readyset.name/c/readyset/+/10231 Reviewed-by: Jason Brown <jason.b@readyset.io> Tested-by: Buildkite CI
1 parent ffe36fe commit 08336cf

File tree

11 files changed

+841
-6
lines changed

11 files changed

+841
-6
lines changed

Cargo.lock

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ resolver = "2"
7777
consulrs = { git = "https://github.com/readysettech/consulrs.git", branch = "allow-disabling-rustls-tls-2" }
7878

7979
# overridden in the patch section above
80-
mysql_async = "0.36.0"
80+
mysql_async = "0.36.1"
8181

8282
# some of these are overridden in the patch section above
8383
postgres = "0.19.9"

database-utils/src/connection.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,15 @@ impl DatabaseConnection {
302302
}
303303
}
304304

305+
/// Returns true if the connection is closed
306+
pub fn is_closed(&self) -> bool {
307+
match self {
308+
Self::PostgreSQL(client, _) => client.is_closed(),
309+
Self::PostgreSQLPool(client) => client.is_closed(),
310+
Self::MySQL(conn) => conn.is_disconnected(),
311+
}
312+
}
313+
305314
/// Returns the SQL dialect associated with the underlying connection type.
306315
pub fn dialect(&self) -> Dialect {
307316
match self {

database-utils/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ pub enum DatabaseError {
3333

3434
#[error("Error interpolating parameters into query: {0}")]
3535
Interpolation(Box<dyn std::error::Error + Send + Sync>),
36+
37+
#[error("Upstream connection is None")]
38+
UpstreamConnectionNone,
39+
40+
#[error("Upstream query timeout")]
41+
UpstreamQueryTimeout,
3642
}
3743

3844
impl DatabaseError {

readyset-adapter/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ lru = { workspace = true }
3737
crossbeam-skiplist = { workspace = true }
3838
slab = { workspace = true }
3939
xxhash-rust = { workspace = true }
40+
rand = { workspace = true }
41+
streaming-iterator = { workspace = true }
4042

4143
readyset-adapter-types = { path = "../readyset-adapter-types/" }
4244
readyset-alloc = { path = "../readyset-alloc/" }
@@ -46,6 +48,8 @@ readyset-data = { path = "../readyset-data/" }
4648
readyset-server = { path = "../readyset-server" }
4749
dataflow-expression = { path = "../dataflow-expression" }
4850
readyset-tracing = { path = "../readyset-tracing" }
51+
readyset-dataflow = { path = "../readyset-dataflow" }
52+
4953
readyset-client-metrics = { path = "../readyset-client-metrics" }
5054
readyset-telemetry-reporter = { path = "../readyset-telemetry-reporter" }
5155
readyset-sql = { path = "../readyset-sql" }

readyset-adapter/src/backend.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ pub struct BackendBuilder {
294294
metrics_handle: Option<MetricsHandle>,
295295
connections: Option<Arc<SkipSet<SocketAddr>>>,
296296
allow_cache_ddl: bool,
297+
sampler_tx: Option<tokio::sync::mpsc::Sender<(QueryExecutionEvent, String)>>,
297298
}
298299

299300
impl Default for BackendBuilder {
@@ -316,6 +317,7 @@ impl Default for BackendBuilder {
316317
metrics_handle: None,
317318
connections: None,
318319
allow_cache_ddl: true,
320+
sampler_tx: None,
319321
}
320322
}
321323
}
@@ -380,6 +382,7 @@ impl BackendBuilder {
380382
status_reporter,
381383
allow_cache_ddl: self.allow_cache_ddl,
382384
adapter_start_time,
385+
sampler_tx: self.sampler_tx,
383386
_query_handler: PhantomData,
384387
}
385388
}
@@ -474,6 +477,15 @@ impl BackendBuilder {
474477
self.metrics_handle = metrics_handle;
475478
self
476479
}
480+
481+
/// Set the sender used to enqueue original queries for background sampling/verification
482+
pub fn sampler_tx(
483+
mut self,
484+
tx: Option<tokio::sync::mpsc::Sender<(QueryExecutionEvent, String)>>,
485+
) -> Self {
486+
self.sampler_tx = tx;
487+
self
488+
}
477489
}
478490

479491
/// A [`PreparedStatement`] stores the data needed for an immediate execution of a prepared
@@ -588,6 +600,9 @@ where
588600
/// The time at which the adapter started.
589601
adapter_start_time: SystemTime,
590602

603+
/// Optional sender to enqueue original queries for background sampling/verification
604+
sampler_tx: Option<tokio::sync::mpsc::Sender<(QueryExecutionEvent, String)>>,
605+
591606
_query_handler: PhantomData<Handler>,
592607
}
593608

@@ -3124,7 +3139,7 @@ where
31243139
self.noria_should_try_select(&mut view_request);
31253140

31263141
if noria_should_try {
3127-
Self::query_adhoc_select(
3142+
let result = Self::query_adhoc_select(
31283143
&mut self.noria,
31293144
self.upstream.as_mut(),
31303145
&self.settings,
@@ -3135,7 +3150,16 @@ where
31353150
&mut event,
31363151
processed_query_params.unwrap(),
31373152
)
3138-
.await
3153+
.await;
3154+
3155+
// Enqueue the original query for background sampling if enabled. We clone here
3156+
// for simplicity and to avoid borrow issues in the hot path; bounded channel
3157+
// will drop on overflow.
3158+
if let Some(tx) = &self.sampler_tx {
3159+
let _ = tx.try_send((event.clone(), query.to_string()));
3160+
}
3161+
3162+
result
31393163
} else {
31403164
Self::query_fallback(self.upstream.as_mut(), query, &mut event).await
31413165
}

readyset-adapter/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod migration_handler;
66
pub mod proxied_queries_reporter;
77
mod query_handler;
88
pub mod query_status_cache;
9+
pub mod sampler;
910
mod status_reporter;
1011
pub mod upstream_database;
1112
mod utils;

0 commit comments

Comments
 (0)