Skip to content

Commit 2a935eb

Browse files
dvora-hchayim
andauthored
RESP3 response callbacks (#2798)
* start cleaning * clean sone callbacks * response callbacks * revert redismod-url change * fix async tests * linters * async cluster --------- Co-authored-by: Chayim <[email protected]>
1 parent e13b239 commit 2a935eb

File tree

7 files changed

+143
-144
lines changed

7 files changed

+143
-144
lines changed

redis/asyncio/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,8 @@ def __init__(
257257

258258
if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
259259
self.response_callbacks.update(self.__class__.RESP3_RESPONSE_CALLBACKS)
260+
else:
261+
self.response_callbacks.update(self.__class__.RESP2_RESPONSE_CALLBACKS)
260262

261263
# If using a single connection client, we need to lock creation-of and use-of
262264
# the client in order to avoid race conditions such as using asyncio.gather

redis/asyncio/cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,8 @@ def __init__(
321321
kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy()
322322
if kwargs.get("protocol") in ["3", 3]:
323323
kwargs["response_callbacks"].update(self.__class__.RESP3_RESPONSE_CALLBACKS)
324+
else:
325+
kwargs["response_callbacks"].update(self.__class__.RESP2_RESPONSE_CALLBACKS)
324326
self.connection_kwargs = kwargs
325327

326328
if startup_nodes:

redis/asyncio/connection.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -355,10 +355,9 @@ async def on_connect(self) -> None:
355355
auth_args = ["default", auth_args[0]]
356356
await self.send_command("HELLO", self.protocol, "AUTH", *auth_args)
357357
response = await self.read_response()
358-
if response.get(b"proto") not in [2, "2"] and response.get("proto") not in [
359-
2,
360-
"2",
361-
]:
358+
if response.get(b"proto") != int(self.protocol) and response.get(
359+
"proto"
360+
) != int(self.protocol):
362361
raise ConnectionError("Invalid RESP version")
363362
# avoid checking health here -- PING will fail if we try
364363
# to check the health prior to the AUTH
@@ -379,7 +378,7 @@ async def on_connect(self) -> None:
379378
raise AuthenticationError("Invalid Username or Password")
380379

381380
# if resp version is specified, switch to it
382-
elif self.protocol != 2:
381+
elif self.protocol not in [2, "2"]:
383382
if isinstance(self._parser, _AsyncRESP2Parser):
384383
self.set_parser(_AsyncRESP3Parser)
385384
# update cluster exception classes

redis/client.py

Lines changed: 119 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -726,162 +726,157 @@ def parse_set_result(response, **options):
726726

727727
class AbstractRedis:
728728
RESPONSE_CALLBACKS = {
729-
**string_keys_to_dict(
730-
"AUTH COPY EXPIRE EXPIREAT PEXPIRE PEXPIREAT "
731-
"HEXISTS HMSET MOVE MSETNX PERSIST "
732-
"PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX",
733-
bool,
734-
),
735-
**string_keys_to_dict(
736-
"BITCOUNT BITPOS DECRBY DEL EXISTS GEOADD GETBIT HDEL HLEN "
737-
"HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD "
738-
"SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN "
739-
"SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM "
740-
"ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE",
741-
int,
742-
),
729+
**string_keys_to_dict("EXPIRE EXPIREAT PEXPIRE PEXPIREAT AUTH", bool),
730+
**string_keys_to_dict("EXISTS", int),
743731
**string_keys_to_dict("INCRBYFLOAT HINCRBYFLOAT", float),
744-
**string_keys_to_dict(
745-
# these return OK, or int if redis-server is >=1.3.4
746-
"LPUSH RPUSH",
747-
lambda r: isinstance(r, int) and r or str_if_bytes(r) == "OK",
748-
),
749-
**string_keys_to_dict("SORT", sort_return_tuples),
750-
**string_keys_to_dict("ZSCORE ZINCRBY GEODIST", float_or_none),
751-
**string_keys_to_dict(
752-
"FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE ASKING READONLY READWRITE "
753-
"RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ",
754-
bool_ok,
755-
),
756-
**string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None),
757-
**string_keys_to_dict(
758-
"SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set()
759-
),
760-
**string_keys_to_dict(
761-
"ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE "
762-
"ZREVRANGE ZREVRANGEBYSCORE",
763-
zset_score_pairs,
764-
),
765-
**string_keys_to_dict(
766-
"BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None
767-
),
768-
**string_keys_to_dict("ZRANK ZREVRANK", int_or_none),
769-
**string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list),
770-
**string_keys_to_dict("XREAD XREADGROUP", parse_xread),
771-
**string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True),
772-
"ACL CAT": lambda r: list(map(str_if_bytes, r)),
773-
"ACL DELUSER": int,
774-
"ACL GENPASS": str_if_bytes,
775-
"ACL GETUSER": parse_acl_getuser,
776-
"ACL HELP": lambda r: list(map(str_if_bytes, r)),
777-
"ACL LIST": lambda r: list(map(str_if_bytes, r)),
778-
"ACL LOAD": bool_ok,
779-
"ACL LOG": parse_acl_log,
780-
"ACL SAVE": bool_ok,
781-
"ACL SETUSER": bool_ok,
782-
"ACL USERS": lambda r: list(map(str_if_bytes, r)),
783-
"ACL WHOAMI": str_if_bytes,
784-
"CLIENT GETNAME": str_if_bytes,
732+
**string_keys_to_dict("READONLY", bool_ok),
733+
"CLUSTER DELSLOTS": bool_ok,
734+
"CLUSTER ADDSLOTS": bool_ok,
735+
"COMMAND": parse_command,
736+
"INFO": parse_info,
737+
"SET": parse_set_result,
785738
"CLIENT ID": int,
786739
"CLIENT KILL": parse_client_kill,
787740
"CLIENT LIST": parse_client_list,
788741
"CLIENT INFO": parse_client_info,
789742
"CLIENT SETNAME": bool_ok,
790-
"CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False,
791-
"CLIENT PAUSE": bool_ok,
792-
"CLIENT GETREDIR": int,
793743
"CLIENT TRACKINGINFO": lambda r: list(map(str_if_bytes, r)),
794-
"CLUSTER ADDSLOTS": bool_ok,
795-
"CLUSTER ADDSLOTSRANGE": bool_ok,
744+
"LASTSAVE": timestamp_to_datetime,
745+
"RESET": str_if_bytes,
746+
"SLOWLOG GET": parse_slowlog_get,
747+
"TIME": lambda x: (int(x[0]), int(x[1])),
748+
**string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None),
749+
"SCAN": parse_scan,
750+
"CLIENT GETNAME": str_if_bytes,
751+
"SSCAN": parse_scan,
752+
"ACL LOG": parse_acl_log,
753+
"ACL WHOAMI": str_if_bytes,
754+
"ACL GENPASS": str_if_bytes,
755+
"ACL CAT": lambda r: list(map(str_if_bytes, r)),
756+
"HSCAN": parse_hscan,
757+
"ZSCAN": parse_zscan,
758+
**string_keys_to_dict(
759+
"BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None
760+
),
796761
"CLUSTER COUNT-FAILURE-REPORTS": lambda x: int(x),
797762
"CLUSTER COUNTKEYSINSLOT": lambda x: int(x),
798-
"CLUSTER DELSLOTS": bool_ok,
799-
"CLUSTER DELSLOTSRANGE": bool_ok,
800763
"CLUSTER FAILOVER": bool_ok,
801764
"CLUSTER FORGET": bool_ok,
802-
"CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)),
803765
"CLUSTER INFO": parse_cluster_info,
804766
"CLUSTER KEYSLOT": lambda x: int(x),
805767
"CLUSTER MEET": bool_ok,
806768
"CLUSTER NODES": parse_cluster_nodes,
807-
"CLUSTER REPLICAS": parse_cluster_nodes,
808769
"CLUSTER REPLICATE": bool_ok,
809770
"CLUSTER RESET": bool_ok,
810771
"CLUSTER SAVECONFIG": bool_ok,
811-
"CLUSTER SET-CONFIG-EPOCH": bool_ok,
812772
"CLUSTER SETSLOT": bool_ok,
813773
"CLUSTER SLAVES": parse_cluster_nodes,
814-
"COMMAND": parse_command,
815-
"COMMAND COUNT": int,
816-
"COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)),
817-
"CONFIG GET": parse_config_get,
818-
"CONFIG RESETSTAT": bool_ok,
819-
"CONFIG SET": bool_ok,
820-
"DEBUG OBJECT": parse_debug_object,
821-
"FUNCTION DELETE": bool_ok,
822-
"FUNCTION FLUSH": bool_ok,
823-
"FUNCTION RESTORE": bool_ok,
774+
**string_keys_to_dict("GEODIST", float_or_none),
824775
"GEOHASH": lambda r: list(map(str_if_bytes, r)),
825776
"GEOPOS": lambda r: list(
826777
map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r)
827778
),
828779
"GEOSEARCH": parse_geosearch_generic,
829780
"GEORADIUS": parse_geosearch_generic,
830781
"GEORADIUSBYMEMBER": parse_geosearch_generic,
831-
"HGETALL": lambda r: r and pairs_to_dict(r) or {},
832-
"HSCAN": parse_hscan,
833-
"INFO": parse_info,
834-
"LASTSAVE": timestamp_to_datetime,
835-
"MEMORY PURGE": bool_ok,
836-
"MEMORY STATS": parse_memory_stats,
837-
"MEMORY USAGE": int_or_none,
838-
"MODULE LOAD": parse_module_result,
839-
"MODULE UNLOAD": parse_module_result,
840-
"MODULE LIST": lambda r: [pairs_to_dict(m) for m in r],
841-
"OBJECT": parse_object,
782+
"XAUTOCLAIM": parse_xautoclaim,
783+
"XINFO STREAM": parse_xinfo_stream,
784+
"XPENDING": parse_xpending,
785+
**string_keys_to_dict("XREAD XREADGROUP", parse_xread),
786+
"COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)),
787+
**string_keys_to_dict("SORT", sort_return_tuples),
842788
"PING": lambda r: str_if_bytes(r) == "PONG",
843-
"QUIT": bool_ok,
844-
"STRALGO": parse_stralgo,
789+
"ACL SETUSER": bool_ok,
845790
"PUBSUB NUMSUB": parse_pubsub_numsub,
846-
"PUBSUB SHARDNUMSUB": parse_pubsub_numsub,
847-
"RANDOMKEY": lambda r: r and r or None,
848-
"RESET": str_if_bytes,
849-
"SCAN": parse_scan,
850-
"SCRIPT EXISTS": lambda r: list(map(bool, r)),
851791
"SCRIPT FLUSH": bool_ok,
852-
"SCRIPT KILL": bool_ok,
853792
"SCRIPT LOAD": str_if_bytes,
854-
"SENTINEL CKQUORUM": bool_ok,
855-
"SENTINEL FAILOVER": bool_ok,
856-
"SENTINEL FLUSHCONFIG": bool_ok,
857-
"SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master,
858-
"SENTINEL MASTER": parse_sentinel_master,
859-
"SENTINEL MASTERS": parse_sentinel_masters,
860-
"SENTINEL MONITOR": bool_ok,
861-
"SENTINEL RESET": bool_ok,
862-
"SENTINEL REMOVE": bool_ok,
863-
"SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels,
864-
"SENTINEL SET": bool_ok,
865-
"SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels,
866-
"SET": parse_set_result,
867-
"SLOWLOG GET": parse_slowlog_get,
868-
"SLOWLOG LEN": int,
869-
"SLOWLOG RESET": bool_ok,
870-
"SSCAN": parse_scan,
871-
"TIME": lambda x: (int(x[0]), int(x[1])),
793+
"ACL GETUSER": parse_acl_getuser,
794+
"CONFIG SET": bool_ok,
795+
**string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list),
872796
"XCLAIM": parse_xclaim,
873-
"XAUTOCLAIM": parse_xautoclaim,
874-
"XGROUP CREATE": bool_ok,
875-
"XGROUP DELCONSUMER": int,
876-
"XGROUP DESTROY": bool,
877-
"XGROUP SETID": bool_ok,
878-
"XINFO CONSUMERS": parse_list_of_dicts,
879-
"XINFO GROUPS": parse_list_of_dicts,
880-
"XINFO STREAM": parse_xinfo_stream,
881-
"XPENDING": parse_xpending,
797+
}
798+
799+
RESP2_RESPONSE_CALLBACKS = {
800+
"CONFIG GET": parse_config_get,
801+
**string_keys_to_dict(
802+
"SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set()
803+
),
804+
**string_keys_to_dict(
805+
"ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE "
806+
"ZREVRANGE ZREVRANGEBYSCORE",
807+
zset_score_pairs,
808+
),
809+
**string_keys_to_dict("ZSCORE ZINCRBY", float_or_none),
882810
"ZADD": parse_zadd,
883-
"ZSCAN": parse_zscan,
884811
"ZMSCORE": parse_zmscore,
812+
"HGETALL": lambda r: r and pairs_to_dict(r) or {},
813+
"MEMORY STATS": parse_memory_stats,
814+
"MODULE LIST": lambda r: [pairs_to_dict(m) for m in r],
815+
# **string_keys_to_dict(
816+
# "COPY "
817+
# "HEXISTS HMSET MOVE MSETNX PERSIST "
818+
# "PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX",
819+
# bool,
820+
# ),
821+
# **string_keys_to_dict(
822+
# "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD "
823+
# "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN "
824+
# "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM "
825+
# "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE",
826+
# int,
827+
# ),
828+
# **string_keys_to_dict(
829+
# "FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE ASKING READWRITE "
830+
# "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ",
831+
# bool_ok,
832+
# ),
833+
# **string_keys_to_dict("ZRANK ZREVRANK", int_or_none),
834+
# **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True),
835+
# "ACL HELP": lambda r: list(map(str_if_bytes, r)),
836+
# "ACL LIST": lambda r: list(map(str_if_bytes, r)),
837+
# "ACL LOAD": bool_ok,
838+
# "ACL SAVE": bool_ok,
839+
# "ACL USERS": lambda r: list(map(str_if_bytes, r)),
840+
# "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False,
841+
# "CLIENT PAUSE": bool_ok,
842+
# "CLUSTER ADDSLOTSRANGE": bool_ok,
843+
# "CLUSTER DELSLOTSRANGE": bool_ok,
844+
# "CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)),
845+
# "CLUSTER REPLICAS": parse_cluster_nodes,
846+
# "CLUSTER SET-CONFIG-EPOCH": bool_ok,
847+
# "CONFIG RESETSTAT": bool_ok,
848+
# "DEBUG OBJECT": parse_debug_object,
849+
# "FUNCTION DELETE": bool_ok,
850+
# "FUNCTION FLUSH": bool_ok,
851+
# "FUNCTION RESTORE": bool_ok,
852+
# "MEMORY PURGE": bool_ok,
853+
# "MEMORY USAGE": int_or_none,
854+
# "MODULE LOAD": parse_module_result,
855+
# "MODULE UNLOAD": parse_module_result,
856+
# "OBJECT": parse_object,
857+
# "QUIT": bool_ok,
858+
# "STRALGO": parse_stralgo,
859+
# "RANDOMKEY": lambda r: r and r or None,
860+
# "SCRIPT EXISTS": lambda r: list(map(bool, r)),
861+
# "SCRIPT KILL": bool_ok,
862+
# "SENTINEL CKQUORUM": bool_ok,
863+
# "SENTINEL FAILOVER": bool_ok,
864+
# "SENTINEL FLUSHCONFIG": bool_ok,
865+
# "SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master,
866+
# "SENTINEL MASTER": parse_sentinel_master,
867+
# "SENTINEL MASTERS": parse_sentinel_masters,
868+
# "SENTINEL MONITOR": bool_ok,
869+
# "SENTINEL RESET": bool_ok,
870+
# "SENTINEL REMOVE": bool_ok,
871+
# "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels,
872+
# "SENTINEL SET": bool_ok,
873+
# "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels,
874+
# "SLOWLOG RESET": bool_ok,
875+
# "XGROUP CREATE": bool_ok,
876+
# "XGROUP DESTROY": bool,
877+
# "XGROUP SETID": bool_ok,
878+
"XINFO CONSUMERS": parse_list_of_dicts,
879+
"XINFO GROUPS": parse_list_of_dicts,
885880
}
886881

887882
RESP3_RESPONSE_CALLBACKS = {
@@ -1122,6 +1117,8 @@ def __init__(
11221117

11231118
if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
11241119
self.response_callbacks.update(self.__class__.RESP3_RESPONSE_CALLBACKS)
1120+
else:
1121+
self.response_callbacks.update(self.__class__.RESP2_RESPONSE_CALLBACKS)
11251122

11261123
def __repr__(self):
11271124
return f"{type(self).__name__}<{repr(self.connection_pool)}>"

redis/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ def on_connect(self):
288288
auth_args = cred_provider.get_credentials()
289289
# if resp version is specified and we have auth args,
290290
# we need to send them via HELLO
291-
if auth_args and self.protocol != 2:
291+
if auth_args and self.protocol not in [2, "2"]:
292292
if isinstance(self._parser, _RESP2Parser):
293293
self.set_parser(_RESP3Parser)
294294
# update cluster exception classes
@@ -321,7 +321,7 @@ def on_connect(self):
321321
raise AuthenticationError("Invalid Username or Password")
322322

323323
# if resp version is specified, switch to it
324-
elif self.protocol != 2:
324+
elif self.protocol not in [2, "2"]:
325325
if isinstance(self._parser, _RESP2Parser):
326326
self.set_parser(_RESP3Parser)
327327
# update cluster exception classes

tests/test_asyncio/test_commands.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async def test_response_callbacks(self, r: redis.Redis):
8585
assert await r.get("a") == "static"
8686

8787
async def test_case_insensitive_command_names(self, r: redis.Redis):
88-
assert r.response_callbacks["del"] == r.response_callbacks["DEL"]
88+
assert r.response_callbacks["ping"] == r.response_callbacks["PING"]
8989

9090

9191
class TestRedisCommands:
@@ -2718,7 +2718,7 @@ async def test_xgroup_setid(self, r: redis.Redis):
27182718
]
27192719
assert await r.xinfo_groups(stream) == expected
27202720

2721-
@skip_if_server_version_lt("5.0.0")
2721+
@skip_if_server_version_lt("7.2.0")
27222722
async def test_xinfo_consumers(self, r: redis.Redis):
27232723
stream = "stream"
27242724
group = "group"
@@ -2734,8 +2734,8 @@ async def test_xinfo_consumers(self, r: redis.Redis):
27342734
info = await r.xinfo_consumers(stream, group)
27352735
assert len(info) == 2
27362736
expected = [
2737-
{"name": consumer1.encode(), "pending": 1},
2738-
{"name": consumer2.encode(), "pending": 2},
2737+
{"name": consumer1.encode(), "pending": 1, "inactive": 2},
2738+
{"name": consumer2.encode(), "pending": 2, "inactive": 2},
27392739
]
27402740

27412741
# we can't determine the idle time, so just make sure it's an int

0 commit comments

Comments
 (0)