@@ -8,7 +8,7 @@ use futures::FutureExt;
88use logger_core:: log_info;
99use redis:: aio:: ConnectionLike ;
1010use redis:: cluster_async:: ClusterConnection ;
11- use redis:: cluster_routing:: { RoutingInfo , SingleNodeRoutingInfo } ;
11+ use redis:: cluster_routing:: { Routable , RoutingInfo , SingleNodeRoutingInfo } ;
1212use redis:: RedisResult ;
1313use redis:: { Cmd , ErrorKind , Value } ;
1414pub use standalone_client:: StandaloneClient ;
@@ -104,14 +104,57 @@ async fn run_with_timeout<T>(
104104 . and_then ( |res| res)
105105}
106106
107+ const BLOCKING_CMD_TIMEOUT_BUFFER : f64 = 0.5 ; // seconds
108+
109+ enum TimeUnit {
110+ Milliseconds = 1000 ,
111+ Seconds = 1 ,
112+ }
113+
114+ // Attempts to get the timeout duration from the command argument at `timeout_idx`.
115+ // If the argument can be parsed into a duration, it returns the duration in seconds. Otherwise, it returns None.
116+ fn try_get_timeout_from_cmd_arg (
117+ cmd : & Cmd ,
118+ timeout_idx : usize ,
119+ time_unit : TimeUnit ,
120+ ) -> Option < Duration > {
121+ cmd. arg_idx ( timeout_idx) . and_then ( |timeout_bytes| {
122+ std:: str:: from_utf8 ( timeout_bytes)
123+ . ok ( )
124+ . and_then ( |timeout_str| {
125+ timeout_str. parse :: < f64 > ( ) . ok ( ) . map ( |timeout| {
126+ let timeout_secs = timeout / ( ( time_unit as i32 ) as f64 ) ;
127+ // We add a buffer to the request timeout to ensure we receive a response from the server
128+ Duration :: from_secs_f64 ( timeout_secs + BLOCKING_CMD_TIMEOUT_BUFFER )
129+ } )
130+ } )
131+ } )
132+ }
133+
134+ fn get_request_timeout ( cmd : & Cmd , default_timeout : Duration ) -> Duration {
135+ let command = cmd. command ( ) . unwrap_or_default ( ) ;
136+ let blocking_timeout = match command. as_slice ( ) {
137+ b"BLPOP" | b"BRPOP" | b"BLMOVE" | b"BZPOPMAX" | b"BZPOPMIN" | b"BRPOPLPUSH" => {
138+ try_get_timeout_from_cmd_arg ( cmd, cmd. args_iter ( ) . len ( ) - 1 , TimeUnit :: Seconds )
139+ }
140+ b"BLMPOP" | b"BZMPOP" => try_get_timeout_from_cmd_arg ( cmd, 1 , TimeUnit :: Seconds ) ,
141+ b"XREAD" | b"XREADGROUP" => cmd
142+ . position ( b"BLOCK" )
143+ . and_then ( |idx| try_get_timeout_from_cmd_arg ( cmd, idx + 1 , TimeUnit :: Milliseconds ) ) ,
144+ _ => None ,
145+ } ;
146+
147+ blocking_timeout. unwrap_or ( default_timeout)
148+ }
149+
107150impl Client {
108151 pub fn send_command < ' a > (
109152 & ' a mut self ,
110153 cmd : & ' a Cmd ,
111154 routing : Option < RoutingInfo > ,
112155 ) -> redis:: RedisFuture < ' a , Value > {
113156 let expected_type = expected_type_for_cmd ( cmd) ;
114- run_with_timeout ( self . request_timeout , async move {
157+ run_with_timeout ( get_request_timeout ( cmd , self . request_timeout ) , async move {
115158 match self . internal_client {
116159 ClientWrapper :: Standalone ( ref mut client) => client. send_command ( cmd) . await ,
117160
@@ -472,3 +515,95 @@ impl GlideClientForTests for StandaloneClient {
472515 self . send_command ( cmd) . boxed ( )
473516 }
474517}
518+
519+ #[ cfg( test) ]
520+ mod tests {
521+ use std:: time:: Duration ;
522+
523+ use redis:: Cmd ;
524+
525+ use crate :: client:: { get_request_timeout, TimeUnit , BLOCKING_CMD_TIMEOUT_BUFFER } ;
526+
527+ use super :: try_get_timeout_from_cmd_arg;
528+
529+ #[ test]
530+ fn test_get_timeout_from_cmd_returns_correct_duration_int ( ) {
531+ let mut cmd = Cmd :: new ( ) ;
532+ cmd. arg ( "BLPOP" ) . arg ( "key1" ) . arg ( "key2" ) . arg ( "5" ) ;
533+ assert_eq ! (
534+ try_get_timeout_from_cmd_arg( & cmd, cmd. args_iter( ) . len( ) - 1 , TimeUnit :: Seconds ) ,
535+ Some ( Duration :: from_secs_f64( 5.0 + BLOCKING_CMD_TIMEOUT_BUFFER ) )
536+ ) ;
537+ }
538+
539+ #[ test]
540+ fn test_get_timeout_from_cmd_returns_correct_duration_float ( ) {
541+ let mut cmd = Cmd :: new ( ) ;
542+ cmd. arg ( "BLPOP" ) . arg ( "key1" ) . arg ( "key2" ) . arg ( 0.5 ) ;
543+ assert_eq ! (
544+ try_get_timeout_from_cmd_arg( & cmd, cmd. args_iter( ) . len( ) - 1 , TimeUnit :: Seconds ) ,
545+ Some ( Duration :: from_secs_f64( 0.5 + BLOCKING_CMD_TIMEOUT_BUFFER ) )
546+ ) ;
547+ }
548+
549+ #[ test]
550+ fn test_get_timeout_from_cmd_returns_correct_duration_milliseconds ( ) {
551+ let mut cmd = Cmd :: new ( ) ;
552+ cmd. arg ( "XREAD" ) . arg ( "BLOCK" ) . arg ( "500" ) . arg ( "key" ) ;
553+ assert_eq ! (
554+ try_get_timeout_from_cmd_arg( & cmd, 2 , TimeUnit :: Milliseconds ) ,
555+ Some ( Duration :: from_secs_f64( 0.5 + BLOCKING_CMD_TIMEOUT_BUFFER ) )
556+ ) ;
557+ }
558+
559+ #[ test]
560+ fn test_get_timeout_from_cmd_returns_none_when_timeout_isnt_passed ( ) {
561+ let mut cmd = Cmd :: new ( ) ;
562+ cmd. arg ( "BLPOP" ) . arg ( "key1" ) . arg ( "key2" ) . arg ( "key3" ) ;
563+ assert_eq ! (
564+ try_get_timeout_from_cmd_arg( & cmd, cmd. args_iter( ) . len( ) - 1 , TimeUnit :: Seconds ) ,
565+ None ,
566+ ) ;
567+ }
568+
569+ #[ test]
570+ fn test_get_request_timeout_with_blocking_command_returns_cmd_arg_timeout ( ) {
571+ let mut cmd = Cmd :: new ( ) ;
572+ cmd. arg ( "BLPOP" ) . arg ( "key1" ) . arg ( "key2" ) . arg ( "500" ) ;
573+ assert_eq ! (
574+ get_request_timeout( & cmd, Duration :: from_millis( 100 ) ) ,
575+ Duration :: from_secs_f64( 500.0 + BLOCKING_CMD_TIMEOUT_BUFFER )
576+ ) ;
577+
578+ let mut cmd = Cmd :: new ( ) ;
579+ cmd. arg ( "XREADGROUP" ) . arg ( "BLOCK" ) . arg ( "500" ) . arg ( "key" ) ;
580+ assert_eq ! (
581+ get_request_timeout( & cmd, Duration :: from_millis( 100 ) ) ,
582+ Duration :: from_secs_f64( 0.5 + BLOCKING_CMD_TIMEOUT_BUFFER )
583+ ) ;
584+
585+ let mut cmd = Cmd :: new ( ) ;
586+ cmd. arg ( "BLMPOP" ) . arg ( "0.857" ) . arg ( "key" ) ;
587+ assert_eq ! (
588+ get_request_timeout( & cmd, Duration :: from_millis( 100 ) ) ,
589+ Duration :: from_secs_f64( 0.857 + BLOCKING_CMD_TIMEOUT_BUFFER )
590+ ) ;
591+ }
592+
593+ #[ test]
594+ fn test_get_request_timeout_non_blocking_command_returns_default_timeout ( ) {
595+ let mut cmd = Cmd :: new ( ) ;
596+ cmd. arg ( "SET" ) . arg ( "key" ) . arg ( "value" ) . arg ( "PX" ) . arg ( "500" ) ;
597+ assert_eq ! (
598+ get_request_timeout( & cmd, Duration :: from_millis( 100 ) ) ,
599+ Duration :: from_millis( 100 )
600+ ) ;
601+
602+ let mut cmd = Cmd :: new ( ) ;
603+ cmd. arg ( "XREADGROUP" ) . arg ( "key" ) ;
604+ assert_eq ! (
605+ get_request_timeout( & cmd, Duration :: from_millis( 100 ) ) ,
606+ Duration :: from_millis( 100 )
607+ ) ;
608+ }
609+ }
0 commit comments