-
Notifications
You must be signed in to change notification settings - Fork 148
Fixed blocking command to be timed out based on the specified command argument #1283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,9 +8,9 @@ use futures::FutureExt; | |
| use logger_core::log_info; | ||
| use redis::aio::ConnectionLike; | ||
| use redis::cluster_async::ClusterConnection; | ||
| use redis::cluster_routing::{RoutingInfo, SingleNodeRoutingInfo}; | ||
| use redis::RedisResult; | ||
| use redis::cluster_routing::{Routable, RoutingInfo, SingleNodeRoutingInfo}; | ||
| use redis::{Cmd, ErrorKind, Value}; | ||
| use redis::{RedisError, RedisResult}; | ||
| pub use standalone_client::StandaloneClient; | ||
| use std::io; | ||
| use std::ops::Deref; | ||
|
|
@@ -95,13 +95,122 @@ pub struct Client { | |
| } | ||
|
|
||
| async fn run_with_timeout<T>( | ||
| timeout: Duration, | ||
| timeout: Option<Duration>, | ||
| future: impl futures::Future<Output = RedisResult<T>> + Send, | ||
| ) -> redis::RedisResult<T> { | ||
| tokio::time::timeout(timeout, future) | ||
| .await | ||
| .map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into()) | ||
| .and_then(|res| res) | ||
| match timeout { | ||
| Some(duration) => tokio::time::timeout(duration, future) | ||
| .await | ||
| .map_err(|_| io::Error::from(io::ErrorKind::TimedOut).into()) | ||
| .and_then(|res| res), | ||
| None => future.await, | ||
| } | ||
| } | ||
|
|
||
| /// Extension to the request timeout for blocking commands to ensure we won't return with timeout error before the server responded | ||
| const BLOCKING_CMD_TIMEOUT_EXTENSION: f64 = 0.5; // seconds | ||
|
|
||
| enum TimeUnit { | ||
| Milliseconds = 1000, | ||
| Seconds = 1, | ||
| } | ||
|
|
||
| /// Enumeration representing different request timeout options. | ||
| #[derive(Default, PartialEq, Debug)] | ||
| enum RequestTimeoutOption { | ||
| // Indicates no timeout should be set for the request. | ||
| NoTimeout, | ||
| // Indicates the request timeout should be based on the client's configured timeout. | ||
| #[default] | ||
| ClientConfig, | ||
| // Indicates the request timeout should be based on the timeout specified in the blocking command. | ||
| BlockingCommand(Duration), | ||
| } | ||
|
|
||
| /// Helper function for parsing a timeout argument to f64. | ||
| /// Attempts to parse the argument found at `timeout_idx` from bytes into an f64. | ||
| fn parse_timeout_to_f64(cmd: &Cmd, timeout_idx: usize) -> RedisResult<f64> { | ||
| let create_err = |err_msg| { | ||
| RedisError::from(( | ||
| ErrorKind::ResponseError, | ||
| err_msg, | ||
| format!( | ||
| "Expected to find timeout value at index {:?} for command {:?}. Recieved command = {:?}", | ||
| timeout_idx, | ||
| std::str::from_utf8(&cmd.command().unwrap_or_default()), | ||
| std::str::from_utf8(&cmd.get_packed_command()) | ||
| ), | ||
| )) | ||
| }; | ||
| let timeout_bytes = cmd | ||
| .arg_idx(timeout_idx) | ||
| .ok_or(create_err("Couldn't find timeout index"))?; | ||
| let timeout_str = std::str::from_utf8(timeout_bytes) | ||
| .map_err(|_| create_err("Failed to parse the timeout argument to string"))?; | ||
| timeout_str | ||
| .parse::<f64>() | ||
| .map_err(|_| create_err("Failed to parse the timeout argument to f64")) | ||
| } | ||
|
|
||
| /// Attempts to get the timeout duration from the command argument at `timeout_idx`. | ||
| /// If the argument can be parsed into a duration, it returns the duration in seconds with BlockingCmdTimeout. | ||
| /// If the timeout argument value is zero, NoTimeout will be returned. Otherwise, ClientConfigTimeout is returned. | ||
| fn get_timeout_from_cmd_arg( | ||
| cmd: &Cmd, | ||
| timeout_idx: usize, | ||
| time_unit: TimeUnit, | ||
| ) -> RedisResult<RequestTimeoutOption> { | ||
| let timeout_secs = parse_timeout_to_f64(cmd, timeout_idx)? / ((time_unit as i32) as f64); | ||
| if timeout_secs < 0.0 { | ||
| // Timeout cannot be negative, return the client's configured request timeout | ||
| Err(RedisError::from(( | ||
| ErrorKind::ResponseError, | ||
| "Timeout cannot be negative", | ||
| format!("Recieved timeout={:?}", timeout_secs), | ||
| ))) | ||
| } else if timeout_secs == 0.0 { | ||
| // `0` means we should set no timeout | ||
| Ok(RequestTimeoutOption::NoTimeout) | ||
| } else { | ||
| // We limit the maximum timeout due to restrictions imposed by Redis and the Duration crate | ||
| if timeout_secs > u32::MAX as f64 { | ||
| Err(RedisError::from(( | ||
| ErrorKind::ResponseError, | ||
| "Timeout is out of range, max timeout is 2^32 - 1 (u32::MAX)", | ||
| format!("Recieved timeout={:?}", timeout_secs), | ||
| ))) | ||
| } else { | ||
| // Extend the request timeout to ensure we don't timeout before receiving a response from the server. | ||
| Ok(RequestTimeoutOption::BlockingCommand( | ||
| Duration::from_secs_f64( | ||
| (timeout_secs + BLOCKING_CMD_TIMEOUT_EXTENSION).min(u32::MAX as f64), | ||
| ), | ||
| )) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> RedisResult<Option<Duration>> { | ||
| let command = cmd.command().unwrap_or_default(); | ||
| let timeout = match command.as_slice() { | ||
| b"BLPOP" | b"BRPOP" | b"BLMOVE" | b"BZPOPMAX" | b"BZPOPMIN" | b"BRPOPLPUSH" => { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has potential to miss the future commands having timeouts - we should add reference to this function to the "integrating new-commands" SOP.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ikolomi Can you create an issue for that?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| get_timeout_from_cmd_arg(cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds) | ||
| } | ||
| b"BLMPOP" | b"BZMPOP" => get_timeout_from_cmd_arg(cmd, 1, TimeUnit::Seconds), | ||
| b"XREAD" | b"XREADGROUP" => cmd | ||
| .position(b"BLOCK") | ||
| .map(|idx| get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds)) | ||
| .unwrap_or(Ok(RequestTimeoutOption::ClientConfig)), | ||
| _ => Ok(RequestTimeoutOption::ClientConfig), | ||
| }?; | ||
|
|
||
| match timeout { | ||
| RequestTimeoutOption::NoTimeout => Ok(None), | ||
| RequestTimeoutOption::ClientConfig => Ok(Some(default_timeout)), | ||
| RequestTimeoutOption::BlockingCommand(blocking_cmd_duration) => { | ||
| Ok(Some(blocking_cmd_duration)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Client { | ||
|
|
@@ -111,7 +220,13 @@ impl Client { | |
| routing: Option<RoutingInfo>, | ||
| ) -> redis::RedisFuture<'a, Value> { | ||
| let expected_type = expected_type_for_cmd(cmd); | ||
| run_with_timeout(self.request_timeout, async move { | ||
| let request_timeout = match get_request_timeout(cmd, self.request_timeout) { | ||
| Ok(request_timeout) => request_timeout, | ||
| Err(err) => { | ||
| return async { Err(err) }.boxed(); | ||
| } | ||
| }; | ||
| run_with_timeout(request_timeout, async move { | ||
| match self.internal_client { | ||
| ClientWrapper::Standalone(ref mut client) => client.send_command(cmd).await, | ||
|
|
||
|
|
@@ -189,7 +304,7 @@ impl Client { | |
| ) -> redis::RedisFuture<'a, Value> { | ||
| let command_count = pipeline.cmd_iter().count(); | ||
| let offset = command_count + 1; | ||
| run_with_timeout(self.request_timeout, async move { | ||
| run_with_timeout(Some(self.request_timeout), async move { | ||
| let values = match self.internal_client { | ||
| ClientWrapper::Standalone(ref mut client) => { | ||
| client.send_pipeline(pipeline, offset, 1).await | ||
|
|
@@ -472,3 +587,153 @@ impl GlideClientForTests for StandaloneClient { | |
| self.send_command(cmd).boxed() | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::time::Duration; | ||
|
|
||
| use redis::Cmd; | ||
|
|
||
| use crate::client::{ | ||
| get_request_timeout, RequestTimeoutOption, TimeUnit, BLOCKING_CMD_TIMEOUT_EXTENSION, | ||
| }; | ||
|
|
||
| use super::get_timeout_from_cmd_arg; | ||
|
|
||
| #[test] | ||
| fn test_get_timeout_from_cmd_returns_correct_duration_int() { | ||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("BLPOP").arg("key1").arg("key2").arg("5"); | ||
| let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds); | ||
| assert!(result.is_ok()); | ||
| assert_eq!( | ||
| result.unwrap(), | ||
| RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64( | ||
| 5.0 + BLOCKING_CMD_TIMEOUT_EXTENSION | ||
| )) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_timeout_from_cmd_returns_correct_duration_float() { | ||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("BLPOP").arg("key1").arg("key2").arg(0.5); | ||
| let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds); | ||
| assert!(result.is_ok()); | ||
| assert_eq!( | ||
| result.unwrap(), | ||
| RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64( | ||
| 0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION | ||
| )) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_timeout_from_cmd_returns_correct_duration_milliseconds() { | ||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("XREAD").arg("BLOCK").arg("500").arg("key"); | ||
| let result = get_timeout_from_cmd_arg(&cmd, 2, TimeUnit::Milliseconds); | ||
| assert!(result.is_ok()); | ||
| assert_eq!( | ||
| result.unwrap(), | ||
| RequestTimeoutOption::BlockingCommand(Duration::from_secs_f64( | ||
| 0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION | ||
| )) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_timeout_from_cmd_returns_err_when_timeout_isnt_passed() { | ||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("BLPOP").arg("key1").arg("key2").arg("key3"); | ||
| let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds); | ||
| assert!(result.is_err()); | ||
| let err = result.unwrap_err(); | ||
| println!("{:?}", err); | ||
| assert!(err.to_string().to_lowercase().contains("index"), "{err}"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_timeout_from_cmd_returns_err_when_timeout_is_larger_than_u32_max() { | ||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("BLPOP") | ||
| .arg("key1") | ||
| .arg("key2") | ||
| .arg(u32::MAX as u64 + 1); | ||
| let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds); | ||
| assert!(result.is_err()); | ||
| let err = result.unwrap_err(); | ||
| println!("{:?}", err); | ||
| assert!(err.to_string().to_lowercase().contains("u32"), "{err}"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_timeout_from_cmd_returns_err_when_timeout_is_negative() { | ||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("BLPOP").arg("key1").arg("key2").arg(-1); | ||
| let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds); | ||
| assert!(result.is_err()); | ||
| let err = result.unwrap_err(); | ||
| assert!(err.to_string().to_lowercase().contains("negative"), "{err}"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_timeout_from_cmd_returns_no_timeout_when_zero_is_passed() { | ||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("BLPOP").arg("key1").arg("key2").arg(0); | ||
| let result = get_timeout_from_cmd_arg(&cmd, cmd.args_iter().len() - 1, TimeUnit::Seconds); | ||
| assert!(result.is_ok()); | ||
| assert_eq!(result.unwrap(), RequestTimeoutOption::NoTimeout,); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_request_timeout_with_blocking_command_returns_cmd_arg_timeout() { | ||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("BLPOP").arg("key1").arg("key2").arg("500"); | ||
| let result = get_request_timeout(&cmd, Duration::from_millis(100)); | ||
| assert!(result.is_ok()); | ||
| assert_eq!( | ||
| result.unwrap(), | ||
| Some(Duration::from_secs_f64( | ||
| 500.0 + BLOCKING_CMD_TIMEOUT_EXTENSION | ||
| )) | ||
| ); | ||
|
|
||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("XREADGROUP").arg("BLOCK").arg("500").arg("key"); | ||
| let result = get_request_timeout(&cmd, Duration::from_millis(100)); | ||
| assert!(result.is_ok()); | ||
| assert_eq!( | ||
| result.unwrap(), | ||
| Some(Duration::from_secs_f64( | ||
| 0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION | ||
| )) | ||
| ); | ||
|
|
||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("BLMPOP").arg("0.857").arg("key"); | ||
| let result = get_request_timeout(&cmd, Duration::from_millis(100)); | ||
| assert!(result.is_ok()); | ||
| assert_eq!( | ||
| result.unwrap(), | ||
| Some(Duration::from_secs_f64( | ||
| 0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION | ||
| )) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_request_timeout_non_blocking_command_returns_default_timeout() { | ||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("SET").arg("key").arg("value").arg("PX").arg("500"); | ||
| let result = get_request_timeout(&cmd, Duration::from_millis(100)); | ||
| assert!(result.is_ok()); | ||
| assert_eq!(result.unwrap(), Some(Duration::from_millis(100))); | ||
|
|
||
| let mut cmd = Cmd::new(); | ||
| cmd.arg("XREADGROUP").arg("key"); | ||
| let result = get_request_timeout(&cmd, Duration::from_millis(100)); | ||
| assert!(result.is_ok()); | ||
| assert_eq!(result.unwrap(), Some(Duration::from_millis(100))); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.