Skip to content

Commit 19e3c50

Browse files
Sampler: Don't consider query timeout as a mismatch
This commit fixed a few mirror issues: - If a timeout is detected, the query should not be considered as a mismatch. - Add a config option to specify the timeout for sampler queries. - Register query sampler with proper connection attributes. Closes: REA-6228 Closes: REA-6244 Fixes: #1576 Fixes: #1573 Release-Note-Core: Allow configuring the timeout for sampler queries. Change-Id: I8d47d40f56ae0802e0c906da589135771172b7ab Reviewed-on: https://gerrit.readyset.name/c/readyset/+/11132 Reviewed-by: Jason Brown <jason.b@readyset.io> Tested-by: Buildkite CI
1 parent 078df35 commit 19e3c50

File tree

2 files changed

+29
-6
lines changed

2 files changed

+29
-6
lines changed

readyset-adapter/src/sampler.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ async fn connect_rs(config: &UpstreamConfig, rs_addr: SocketAddr) -> Option<Data
180180

181181
async fn connect_upstream(config: &UpstreamConfig) -> Option<DatabaseConnection> {
182182
debug!("Connecting to upstream");
183-
let url: DatabaseURL = if let Some(url) = &config.upstream_db_url {
183+
let mut url: DatabaseURL = if let Some(url) = &config.upstream_db_url {
184184
match url.parse() {
185185
Ok(url) => url,
186186
Err(error) => {
@@ -191,6 +191,7 @@ async fn connect_upstream(config: &UpstreamConfig) -> Option<DatabaseConnection>
191191
} else {
192192
return None;
193193
};
194+
url.set_program_or_application_name(READYSET_QUERY_SAMPLER.to_string());
194195
let verification = match ServerCertVerification::from(config).await {
195196
Ok(verification) => verification,
196197
Err(error) => {
@@ -299,6 +300,7 @@ impl Sampler {
299300
timeout: Duration,
300301
conn: &mut Option<DatabaseConnection>,
301302
q: &str,
303+
name: String,
302304
) -> Result<QueryResults, DatabaseError> {
303305
let fut = async {
304306
match conn.as_mut() {
@@ -310,7 +312,7 @@ impl Sampler {
310312
Ok(res) => res,
311313
Err(_) => {
312314
rate_limit(true, SAMPLER_LOG_SAMPLER, || {
313-
warn!("Sampler upstream read timeout");
315+
debug!(?name, "Sampler upstream read timeout");
314316
});
315317
Err(DatabaseError::UpstreamQueryTimeout)
316318
}
@@ -348,9 +350,13 @@ impl Sampler {
348350
);
349351

350352
// Readyset execution (materialize rows for optional tracing on mismatch)
351-
let rs_res =
352-
Self::query_with_timeout(self.config.upstream_timeout, &mut self.rs_conn, &entry.q)
353-
.await;
353+
let rs_res = Self::query_with_timeout(
354+
self.config.upstream_timeout,
355+
&mut self.rs_conn,
356+
&entry.q,
357+
get_cache_name(&entry.event),
358+
)
359+
.await;
354360
let (rs_hash_opt, rs_rows_opt) = match rs_res {
355361
Ok(rs_rows) => {
356362
let mut rows_vec: Vec<Vec<DfValue>> = match <Vec<Vec<DfValue>>>::try_from(rs_rows) {
@@ -363,6 +369,12 @@ impl Sampler {
363369
let hash = Some(normalize_and_hash(&mut rows_vec, has_order_by));
364370
(hash, Some(rows_vec))
365371
}
372+
Err(DatabaseError::UpstreamQueryTimeout) => {
373+
rate_limit(true, SAMPLER_LOG_SAMPLER, || {
374+
debug!("Sampler readyset query timed out; discarding sample");
375+
});
376+
return;
377+
}
366378
Err(e) => {
367379
debug!(error = %e, "Sampler readyset error");
368380
self.reconnect_rs_if_closed().await;
@@ -384,6 +396,7 @@ impl Sampler {
384396
self.config.upstream_timeout,
385397
&mut self.upstream_conn,
386398
&entry.q,
399+
get_cache_name(&entry.event),
387400
)
388401
.await;
389402
let (up_hash_opt, up_rows_opt) = match up_res {
@@ -398,6 +411,12 @@ impl Sampler {
398411
let hash = Some(normalize_and_hash(&mut rows_vec, has_order_by));
399412
(hash, Some(rows_vec))
400413
}
414+
Err(DatabaseError::UpstreamQueryTimeout) => {
415+
rate_limit(true, SAMPLER_LOG_SAMPLER, || {
416+
debug!("Sampler upstream query timed out; discarding sample");
417+
});
418+
return;
419+
}
401420
Err(e) => {
402421
debug!(error = %e, "Sampler upstream error");
403422
self.reconnect_upstream_if_closed().await;

readyset/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,10 @@ pub struct Options {
458458
#[arg(long, env = "SAMPLER_MAX_QUERIES_PER_SECOND", default_value = "100")]
459459
sampler_max_queries_per_second: u64,
460460

461+
/// Specifies the timeout for sampler queries.
462+
#[arg(long, env = "SAMPLER_QUERY_TIMEOUT_MS", default_value = "1000")]
463+
sampler_query_timeout_ms: u64,
464+
461465
/// If set, Readyset will only verify if the current configuration is valid and then exit.
462466
///
463467
/// On success, the exit code will be zero with a message printed to stdout. On failure, the
@@ -1301,7 +1305,7 @@ where
13011305
readyset_adapter::sampler::SamplerConfig {
13021306
sample_rate: options.sampler_sample_rate,
13031307
queue_capacity: options.sampler_queue_capacity,
1304-
upstream_timeout: Duration::from_secs(1),
1308+
upstream_timeout: Duration::from_millis(options.sampler_query_timeout_ms),
13051309
max_retry_attempts: options.sampler_max_retries,
13061310
retry_delay: Duration::from_millis(options.sampler_retry_delay_ms),
13071311
max_qps: options.sampler_max_queries_per_second,

0 commit comments

Comments
 (0)