Skip to content

Commit c3a67f1

Browse files
committed
Fix blocking commands tests in wrappers
1 parent 802230c commit c3a67f1

File tree

7 files changed

+311
-25
lines changed

7 files changed

+311
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#### Fixes
1212
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
13+
* Core: Fixed blocking commands to use the specified timeout from the command argument
1314

1415
## 0.3.3 (2024-03-28)
1516

glide-core/src/client/mod.rs

Lines changed: 181 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use futures::FutureExt;
88
use logger_core::log_info;
99
use redis::aio::ConnectionLike;
1010
use redis::cluster_async::ClusterConnection;
11-
use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo};
11+
use redis::cluster_routing::{Routable, RoutingInfo, SingleNodeRoutingInfo};
1212
use redis::RedisResult;
1313
use redis::{Cmd, ErrorKind, Value};
1414
pub use standalone_client::StandaloneClient;
@@ -98,10 +98,67 @@ async fn run_with_timeout<T>(
9898
timeout: Duration,
9999
future: impl futures::Future<Output = RedisResult<T>> + Send,
100100
) -> redis::RedisResult<T> {
101-
tokio::time::timeout(timeout, future)
102-
.await
103-
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
104-
.and_then(|res| res)
101+
if timeout == Duration::from_secs(0) {
102+
// run without timeout
103+
future.await
104+
} else {
105+
// run with timeout
106+
tokio::time::timeout(timeout, future)
107+
.await
108+
.map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into())
109+
.and_then(|res| res)
110+
}
111+
}
112+
113+
// Extension to the request timeout for blocking commands to ensure we won't return with timeout error before the server responded
114+
const BLOCKING_CMD_TIMEOUT_EXTENSION: f64 = 0.5; // seconds
115+
116+
enum TimeUnit {
117+
Milliseconds = 1000,
118+
Seconds = 1,
119+
}
120+
121+
// Attempts to get the timeout duration from the command argument at `timeout_idx`.
122+
// If the argument can be parsed into a duration, it returns the duration in seconds. Otherwise, it returns None.
123+
fn try_get_timeout_from_cmd_arg(
124+
cmd: &Cmd,
125+
timeout_idx: usize,
126+
time_unit: TimeUnit,
127+
) -> Option<Duration> {
128+
cmd.arg_idx(timeout_idx).and_then(|timeout_bytes| {
129+
std::str::from_utf8(timeout_bytes)
130+
.ok()
131+
.and_then(|timeout_str| {
132+
timeout_str.parse::<f64>().ok().map(|timeout| {
133+
let mut timeout_secs = timeout / ((time_unit as i32) as f64);
134+
if timeout_secs < 0.0 {
135+
// Timeout cannot be negative, return None so the default request timeout will be used and the server will response with error
136+
return None;
137+
} else if timeout_secs > 0.0 {
138+
// Extend the request timeout to ensure we don't timeout before receiving a response from the server.
139+
timeout_secs += BLOCKING_CMD_TIMEOUT_EXTENSION;
140+
};
141+
Some(Duration::from_secs_f64(timeout_secs))
142+
})
143+
})
144+
.unwrap_or(None)
145+
})
146+
}
147+
148+
fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> Duration {
149+
let command = cmd.command().unwrap_or_default();
150+
let blocking_timeout = match command.as_slice() {
151+
b"BLPOP" | b"BRPOP" | b"BLMOVE" | b"BZPOPMAX" | b"BZPOPMIN" | b"BRPOPLPUSH" => {
152+
try_get_timeout_from_cmd_arg(cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds)
153+
}
154+
b"BLMPOP" | b"BZMPOP" => try_get_timeout_from_cmd_arg(cmd, 1, TimeUnit::Seconds),
155+
b"XREAD" | b"XREADGROUP" => cmd
156+
.position(b"BLOCK")
157+
.and_then(|idx| try_get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds)),
158+
_ => None,
159+
};
160+
161+
blocking_timeout.unwrap_or(default_timeout)
105162
}
106163

107164
impl Client {
@@ -111,7 +168,7 @@ impl Client {
111168
routing: Option<RoutingInfo>,
112169
) -> redis::RedisFuture<'a, Value> {
113170
let expected_type = expected_type_for_cmd(cmd);
114-
run_with_timeout(self.request_timeout, async move {
171+
run_with_timeout(get_request_timeout(cmd, self.request_timeout), async move {
115172
match self.internal_client {
116173
ClientWrapper::Standalone(ref mut client) => client.send_command(cmd).await,
117174

@@ -472,3 +529,121 @@ impl GlideClientForTests for StandaloneClient {
472529
self.send_command(cmd).boxed()
473530
}
474531
}
532+
533+
#[cfg(test)]
534+
mod tests {
535+
use std::time::Duration;
536+
537+
use redis::Cmd;
538+
539+
use crate::client::{get_request_timeout, TimeUnit, BLOCKING_CMD_TIMEOUT_EXTENSION};
540+
541+
use super::try_get_timeout_from_cmd_arg;
542+
543+
#[test]
544+
fn test_get_timeout_from_cmd_returns_correct_duration_int() {
545+
let mut cmd = Cmd::new();
546+
cmd.arg("BLPOP").arg("key1").arg("key2").arg("5");
547+
assert_eq!(
548+
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
549+
Some(Duration::from_secs_f64(
550+
5.0 + BLOCKING_CMD_TIMEOUT_EXTENSION
551+
))
552+
);
553+
}
554+
555+
#[test]
556+
fn test_get_timeout_from_cmd_returns_correct_duration_float() {
557+
let mut cmd = Cmd::new();
558+
cmd.arg("BLPOP").arg("key1").arg("key2").arg(0.5);
559+
assert_eq!(
560+
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
561+
Some(Duration::from_secs_f64(
562+
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
563+
))
564+
);
565+
}
566+
567+
#[test]
568+
fn test_get_timeout_from_cmd_returns_correct_duration_milliseconds() {
569+
let mut cmd = Cmd::new();
570+
cmd.arg("XREAD").arg("BLOCK").arg("500").arg("key");
571+
assert_eq!(
572+
try_get_timeout_from_cmd_arg(&cmd, 2, TimeUnit::Milliseconds),
573+
Some(Duration::from_secs_f64(
574+
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
575+
))
576+
);
577+
}
578+
579+
#[test]
580+
fn test_get_timeout_from_cmd_returns_none_when_timeout_isnt_passed() {
581+
let mut cmd = Cmd::new();
582+
cmd.arg("BLPOP").arg("key1").arg("key2").arg("key3");
583+
assert_eq!(
584+
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
585+
None,
586+
);
587+
}
588+
589+
#[test]
590+
fn test_get_timeout_from_cmd_returns_none_when_timeout_is_negative() {
591+
let mut cmd = Cmd::new();
592+
cmd.arg("BLPOP").arg("key1").arg("key2").arg(-1);
593+
assert_eq!(
594+
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
595+
None,
596+
);
597+
}
598+
599+
#[test]
600+
fn test_get_timeout_from_cmd_returns_duration_without_extension_when_zero_is_passed() {
601+
let mut cmd = Cmd::new();
602+
cmd.arg("BLPOP").arg("key1").arg("key2").arg(0);
603+
assert_eq!(
604+
try_get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds),
605+
Some(Duration::from_secs(0)),
606+
);
607+
}
608+
609+
#[test]
610+
fn test_get_request_timeout_with_blocking_command_returns_cmd_arg_timeout() {
611+
let mut cmd = Cmd::new();
612+
cmd.arg("BLPOP").arg("key1").arg("key2").arg("500");
613+
assert_eq!(
614+
get_request_timeout(&cmd, Duration::from_millis(100)),
615+
Duration::from_secs_f64(500.0 + BLOCKING_CMD_TIMEOUT_EXTENSION)
616+
);
617+
618+
let mut cmd = Cmd::new();
619+
cmd.arg("XREADGROUP").arg("BLOCK").arg("500").arg("key");
620+
assert_eq!(
621+
get_request_timeout(&cmd, Duration::from_millis(100)),
622+
Duration::from_secs_f64(0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION)
623+
);
624+
625+
let mut cmd = Cmd::new();
626+
cmd.arg("BLMPOP").arg("0.857").arg("key");
627+
assert_eq!(
628+
get_request_timeout(&cmd, Duration::from_millis(100)),
629+
Duration::from_secs_f64(0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION)
630+
);
631+
}
632+
633+
#[test]
634+
fn test_get_request_timeout_non_blocking_command_returns_default_timeout() {
635+
let mut cmd = Cmd::new();
636+
cmd.arg("SET").arg("key").arg("value").arg("PX").arg("500");
637+
assert_eq!(
638+
get_request_timeout(&cmd, Duration::from_millis(100)),
639+
Duration::from_millis(100)
640+
);
641+
642+
let mut cmd = Cmd::new();
643+
cmd.arg("XREADGROUP").arg("key");
644+
assert_eq!(
645+
get_request_timeout(&cmd, Duration::from_millis(100)),
646+
Duration::from_millis(100)
647+
);
648+
}
649+
}

glide-core/tests/test_client.rs

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ mod utilities;
66
#[cfg(test)]
77
pub(crate) mod shared_client_tests {
88
use super::*;
9-
use glide_core::client::Client;
9+
use glide_core::client::{Client, DEFAULT_RESPONSE_TIMEOUT};
1010
use redis::{
1111
cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo},
1212
FromRedisValue, InfoDict, RedisConnectionInfo, Value,
@@ -316,6 +316,45 @@ pub(crate) mod shared_client_tests {
316316
#[rstest]
317317
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
318318
fn test_request_timeout(#[values(false, true)] use_cluster: bool) {
319+
block_on_all(async {
320+
let mut test_basics = setup_test_basics(
321+
use_cluster,
322+
TestConfiguration {
323+
request_timeout: Some(1),
324+
shared_server: false,
325+
..Default::default()
326+
},
327+
)
328+
.await;
329+
let mut cmd = redis::Cmd::new();
330+
// Create a long running command to ensure we get into timeout
331+
cmd.arg("EVAL")
332+
.arg(
333+
r#"
334+
local i = 0
335+
while (true)
336+
do
337+
redis.call('ping')
338+
i = i + 1
339+
end
340+
"#,
341+
)
342+
.arg("0");
343+
let result = test_basics.client.send_command(&cmd, None).await;
344+
assert!(result.is_err());
345+
let err = result.unwrap_err();
346+
assert!(err.is_timeout(), "{err}");
347+
});
348+
}
349+
350+
#[rstest]
351+
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
352+
fn test_blocking_command_doesnt_raise_timeout_error(#[values(false, true)] use_cluster: bool) {
353+
// We test that the request timeout is based on the value specified in the blocking command argument,
354+
// and not on the one set in the client configuration. To achieve this, we execute a command designed to
355+
// be blocked until it reaches the specified command timeout. We set the client's request timeout to
356+
// a shorter duration than the blocking command's timeout. Subsequently, we confirm that we receive
357+
// a response from the server instead of encountering a timeout error.
319358
block_on_all(async {
320359
let mut test_basics = setup_test_basics(
321360
use_cluster,
@@ -328,11 +367,65 @@ pub(crate) mod shared_client_tests {
328367
.await;
329368

330369
let mut cmd = redis::Cmd::new();
331-
cmd.arg("BLPOP").arg("foo").arg(0); // 0 timeout blocks indefinitely
370+
cmd.arg("BLPOP").arg("foo").arg(0.3); // server should return null after 300 millisecond
371+
let result = test_basics.client.send_command(&cmd, None).await;
372+
assert!(result.is_ok());
373+
assert_eq!(result.unwrap(), Value::Nil);
374+
});
375+
}
376+
377+
#[rstest]
378+
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
379+
fn test_blocking_command_with_zero_timeout_blocks_indefinitely(
380+
#[values(false, true)] use_cluster: bool,
381+
) {
382+
// We test that when blocking command is passed with 0 as the timeout duration, the command will be blocked indefinitely
383+
block_on_all(async {
384+
let mut test_basics = setup_test_basics(
385+
use_cluster,
386+
TestConfiguration {
387+
request_timeout: Some(1),
388+
shared_server: true,
389+
..Default::default()
390+
},
391+
)
392+
.await;
393+
let mut cmd = redis::Cmd::new();
394+
cmd.arg("BLPOP").arg("foo").arg(-1);
332395
let result = test_basics.client.send_command(&cmd, None).await;
333396
assert!(result.is_err());
334397
let err = result.unwrap_err();
335-
assert!(err.is_timeout(), "{err}");
398+
println!("{:?}", err);
399+
assert_eq!(err.kind(), redis::ErrorKind::ResponseError);
400+
assert!(err.to_string().contains("timeout is negative"));
401+
});
402+
}
403+
404+
#[rstest]
405+
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
406+
fn test_blocking_command_with_negative_timeout_returns_error(
407+
#[values(false, true)] use_cluster: bool,
408+
) {
409+
// We test that when blocking command is passed with a negative timeout the command will return with an error
410+
block_on_all(async {
411+
let mut test_basics = setup_test_basics(
412+
use_cluster,
413+
TestConfiguration {
414+
request_timeout: Some(1),
415+
shared_server: true,
416+
..Default::default()
417+
},
418+
)
419+
.await;
420+
let future = async move {
421+
let mut cmd = redis::Cmd::new();
422+
cmd.arg("BLPOP").arg("foo").arg(0); // `0` should block indefinitely
423+
test_basics.client.send_command(&cmd, None).await
424+
};
425+
// We execute the command with Tokio's timeout wrapper to prevent the test from hanging indefinitely.
426+
let tokio_timeout_result =
427+
tokio::time::timeout(DEFAULT_RESPONSE_TIMEOUT * 2, future).await;
428+
assert!(tokio_timeout_result.is_err());
336429
});
337430
}
338431

java/integTest/src/test/java/glide/SharedCommandTests.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,15 +1813,12 @@ public void brpop(BaseClient client) {
18131813

18141814
// nothing popped out
18151815
assertNull(
1816-
client
1817-
.brpop(new String[] {listKey2}, REDIS_VERSION.isLowerThan("7.0.0") ? 1. : 0.001)
1818-
.get());
1816+
client.brpop(new String[] {listKey2}, REDIS_VERSION.isLowerThan("7.0.0") ? 1. : 0.5).get());
18191817

18201818
// Key exists, but it is not a list
18211819
assertEquals(OK, client.set("foo", "bar").get());
18221820
ExecutionException executionException =
1823-
assertThrows(
1824-
ExecutionException.class, () -> client.brpop(new String[] {"foo"}, .0001).get());
1821+
assertThrows(ExecutionException.class, () -> client.brpop(new String[] {"foo"}, 0.5).get());
18251822
assertTrue(executionException.getCause() instanceof RequestException);
18261823
}
18271824

@@ -1866,15 +1863,12 @@ public void blpop(BaseClient client) {
18661863

18671864
// nothing popped out
18681865
assertNull(
1869-
client
1870-
.blpop(new String[] {listKey2}, REDIS_VERSION.isLowerThan("7.0.0") ? 1. : 0.001)
1871-
.get());
1866+
client.blpop(new String[] {listKey2}, REDIS_VERSION.isLowerThan("7.0.0") ? 1. : 0.5).get());
18721867

18731868
// Key exists, but it is not a list
18741869
assertEquals(OK, client.set("foo", "bar").get());
18751870
ExecutionException executionException =
1876-
assertThrows(
1877-
ExecutionException.class, () -> client.blpop(new String[] {"foo"}, .0001).get());
1871+
assertThrows(ExecutionException.class, () -> client.blpop(new String[] {"foo"}, 0.5).get());
18781872
assertTrue(executionException.getCause() instanceof RequestException);
18791873
}
18801874

node/tests/SharedTests.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2041,14 +2041,14 @@ export function runBaseTests<Context>(config: {
20412041
await client.rpush("brpop-test", ["foo", "bar", "baz"]),
20422042
).toEqual(3);
20432043
// Test basic usage
2044-
expect(await client.brpop(["brpop-test"], 0.1)).toEqual([
2044+
expect(await client.brpop(["brpop-test"], 0.5)).toEqual([
20452045
"brpop-test",
20462046
"baz",
20472047
]);
20482048
// Delete all values from list
20492049
expect(await client.del(["brpop-test"])).toEqual(1);
20502050
// Test null return when key doesn't exist
2051-
expect(await client.brpop(["brpop-test"], 0.1)).toEqual(null);
2051+
expect(await client.brpop(["brpop-test"], 0.5)).toEqual(null);
20522052
}, protocol);
20532053
},
20542054
config.timeout,

0 commit comments

Comments
 (0)