Skip to content

Commit 8c590a7

Browse files
authored
Python: add BZPOPMIN and BZPOPMAX commands (#1399)
* Python: add BZPOPMIN and BZPOPMAX commands (#266) * Update PR link * PR suggestions * Fix rust
1 parent c726596 commit 8c590a7

File tree

6 files changed

+296
-4
lines changed

6 files changed

+296
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
* Python: Added ZRANGESTORE command ([#1377](https://github.com/aws/glide-for-redis/pull/1377))
2626
* Python: Added ZDIFFSTORE command ([#1378](https://github.com/aws/glide-for-redis/pull/1378))
2727
* Python: Added ZDIFF command ([#1401](https://github.com/aws/glide-for-redis/pull/1401))
28+
* Python: Added BZPOPMIN and BZPOPMAX commands ([#1399](https://github.com/aws/glide-for-redis/pull/1399))
29+
2830

2931
#### Fixes
3032
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))

glide-core/src/client/value_conversion.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub(crate) enum ExpectedReturnType {
2222
ArrayOfArraysOfDoubleOrNull,
2323
ArrayOfKeyValuePairs,
2424
ZMPopReturnType,
25+
KeyWithMemberAndScore,
2526
}
2627

2728
pub(crate) fn convert_to_expected_type(
@@ -320,6 +321,28 @@ pub(crate) fn convert_to_expected_type(
320321
)
321322
.into()),
322323
},
324+
// Used by BZPOPMIN/BZPOPMAX, which return an array consisting of the key of the sorted set that was popped, the popped member, and its score.
325+
// RESP2 returns the score as a string, but RESP3 returns the score as a double. Here we convert string scores into type double.
326+
ExpectedReturnType::KeyWithMemberAndScore => match value {
327+
Value::Nil => Ok(value),
328+
Value::Array(ref array) if array.len() == 3 && matches!(array[2], Value::Double(_)) => {
329+
Ok(value)
330+
}
331+
Value::Array(mut array)
332+
if array.len() == 3
333+
&& matches!(array[2], Value::BulkString(_) | Value::SimpleString(_)) =>
334+
{
335+
array[2] =
336+
convert_to_expected_type(array[2].clone(), Some(ExpectedReturnType::Double))?;
337+
Ok(Value::Array(array))
338+
}
339+
_ => Err((
340+
ErrorKind::TypeError,
341+
"Response couldn't be converted to an array containing a key, member, and score",
342+
format!("(response was {:?})", value),
343+
)
344+
.into()),
345+
},
323346
}
324347
}
325348

@@ -454,6 +477,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
454477
b"ZRANK" | b"ZREVRANK" => cmd
455478
.position(b"WITHSCORE")
456479
.map(|_| ExpectedReturnType::ZRankReturnType),
480+
b"BZPOPMIN" | b"BZPOPMAX" => Some(ExpectedReturnType::KeyWithMemberAndScore),
457481
b"SPOP" => {
458482
if cmd.arg_idx(2).is_some() {
459483
Some(ExpectedReturnType::Set)
@@ -815,6 +839,70 @@ mod tests {
815839
));
816840
}
817841

842+
#[test]
843+
fn convert_bzpopmin_bzpopmax() {
844+
assert!(matches!(
845+
expected_type_for_cmd(
846+
redis::cmd("BZPOPMIN")
847+
.arg("myzset1")
848+
.arg("myzset2")
849+
.arg("1")
850+
),
851+
Some(ExpectedReturnType::KeyWithMemberAndScore)
852+
));
853+
854+
assert!(matches!(
855+
expected_type_for_cmd(
856+
redis::cmd("BZPOPMAX")
857+
.arg("myzset1")
858+
.arg("myzset2")
859+
.arg("1")
860+
),
861+
Some(ExpectedReturnType::KeyWithMemberAndScore)
862+
));
863+
864+
let array_with_double_score = Value::Array(vec![
865+
Value::BulkString(b"key1".to_vec()),
866+
Value::BulkString(b"member1".to_vec()),
867+
Value::Double(2.0),
868+
]);
869+
let result = convert_to_expected_type(
870+
array_with_double_score.clone(),
871+
Some(ExpectedReturnType::KeyWithMemberAndScore),
872+
)
873+
.unwrap();
874+
assert_eq!(array_with_double_score, result);
875+
876+
let array_with_string_score = Value::Array(vec![
877+
Value::BulkString(b"key1".to_vec()),
878+
Value::BulkString(b"member1".to_vec()),
879+
Value::BulkString(b"2.0".to_vec()),
880+
]);
881+
let result = convert_to_expected_type(
882+
array_with_string_score.clone(),
883+
Some(ExpectedReturnType::KeyWithMemberAndScore),
884+
)
885+
.unwrap();
886+
assert_eq!(array_with_double_score, result);
887+
888+
let converted_nil_value =
889+
convert_to_expected_type(Value::Nil, Some(ExpectedReturnType::KeyWithMemberAndScore))
890+
.unwrap();
891+
assert_eq!(Value::Nil, converted_nil_value);
892+
893+
let array_with_unexpected_length = Value::Array(vec![
894+
Value::BulkString(b"key1".to_vec()),
895+
Value::BulkString(b"member1".to_vec()),
896+
Value::Double(2.0),
897+
Value::Double(2.0),
898+
]);
899+
assert!(convert_to_expected_type(
900+
array_with_unexpected_length,
901+
Some(ExpectedReturnType::KeyWithMemberAndScore)
902+
)
903+
.is_err());
904+
}
905+
818906
#[test]
819907
fn convert_zank_zrevrank_only_if_withsocres_is_included() {
820908
assert!(matches!(

python/python/glide/async_commands/core.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2285,6 +2285,42 @@ async def zpopmax(
22852285
),
22862286
)
22872287

2288+
async def bzpopmax(
2289+
self, keys: List[str], timeout: float
2290+
) -> Optional[List[Union[str, float]]]:
2291+
"""
2292+
Pops the member with the highest score from the first non-empty sorted set, with the given keys being checked in
2293+
the order that they are given. Blocks the connection when there are no members to remove from any of the given
2294+
sorted sets.
2295+
2296+
When in cluster mode, all keys must map to the same hash slot.
2297+
2298+
`BZPOPMAX` is the blocking variant of `ZPOPMAX`.
2299+
2300+
`BZPOPMAX` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
2301+
2302+
See https://valkey.io/commands/bzpopmax for more details.
2303+
2304+
Args:
2305+
keys (List[str]): The keys of the sorted sets.
2306+
timeout (float): The number of seconds to wait for a blocking operation to complete.
2307+
A value of 0 will block indefinitely.
2308+
2309+
Returns:
2310+
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
2311+
and the member score. If no member could be popped and the `timeout` expired, returns None.
2312+
2313+
Examples:
2314+
>>> await client.zadd("my_sorted_set1", {"member1": 10.0, "member2": 5.0})
2315+
2 # Two elements have been added to the sorted set at "my_sorted_set1".
2316+
>>> await client.bzpopmax(["my_sorted_set1", "my_sorted_set2"], 0.5)
2317+
['my_sorted_set1', 'member1', 10.0] # "member1" with a score of 10.0 has been removed from "my_sorted_set1".
2318+
"""
2319+
return cast(
2320+
Optional[List[Union[str, float]]],
2321+
await self._execute_command(RequestType.BZPopMax, keys + [str(timeout)]),
2322+
)
2323+
22882324
async def zpopmin(
22892325
self, key: str, count: Optional[int] = None
22902326
) -> Mapping[str, float]:
@@ -2317,6 +2353,42 @@ async def zpopmin(
23172353
),
23182354
)
23192355

2356+
async def bzpopmin(
2357+
self, keys: List[str], timeout: float
2358+
) -> Optional[List[Union[str, float]]]:
2359+
"""
2360+
Pops the member with the lowest score from the first non-empty sorted set, with the given keys being checked in
2361+
the order that they are given. Blocks the connection when there are no members to remove from any of the given
2362+
sorted sets.
2363+
2364+
When in cluster mode, all keys must map to the same hash slot.
2365+
2366+
`BZPOPMIN` is the blocking variant of `ZPOPMIN`.
2367+
2368+
`BZPOPMIN` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
2369+
2370+
See https://valkey.io/commands/bzpopmin for more details.
2371+
2372+
Args:
2373+
keys (List[str]): The keys of the sorted sets.
2374+
timeout (float): The number of seconds to wait for a blocking operation to complete.
2375+
A value of 0 will block indefinitely.
2376+
2377+
Returns:
2378+
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
2379+
and the member score. If no member could be popped and the `timeout` expired, returns None.
2380+
2381+
Examples:
2382+
>>> await client.zadd("my_sorted_set1", {"member1": 10.0, "member2": 5.0})
2383+
2 # Two elements have been added to the sorted set at "my_sorted_set1".
2384+
>>> await client.bzpopmin(["my_sorted_set1", "my_sorted_set2"], 0.5)
2385+
['my_sorted_set1', 'member2', 5.0] # "member2" with a score of 5.0 has been removed from "my_sorted_set1".
2386+
"""
2387+
return cast(
2388+
Optional[List[Union[str, float]]],
2389+
await self._execute_command(RequestType.BZPopMin, keys + [str(timeout)]),
2390+
)
2391+
23202392
async def zrange(
23212393
self,
23222394
key: str,

python/python/glide/async_commands/transaction.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1624,6 +1624,29 @@ def zpopmax(
16241624
RequestType.ZPopMax, [key, str(count)] if count else [key]
16251625
)
16261626

1627+
def bzpopmax(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
1628+
"""
1629+
Pops the member with the highest score from the first non-empty sorted set, with the given keys being checked in
1630+
the order that they are given. Blocks the connection when there are no members to remove from any of the given
1631+
sorted sets.
1632+
1633+
`BZPOPMAX` is the blocking variant of `ZPOPMAX`.
1634+
1635+
`BZPOPMAX` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
1636+
1637+
See https://valkey.io/commands/bzpopmax for more details.
1638+
1639+
Args:
1640+
keys (List[str]): The keys of the sorted sets.
1641+
timeout (float): The number of seconds to wait for a blocking operation to complete.
1642+
A value of 0 will block indefinitely.
1643+
1644+
Command response:
1645+
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
1646+
and the member score. If no member could be popped and the `timeout` expired, returns None.
1647+
"""
1648+
return self.append_command(RequestType.BZPopMax, keys + [str(timeout)])
1649+
16271650
def zpopmin(
16281651
self: TTransaction, key: str, count: Optional[int] = None
16291652
) -> TTransaction:
@@ -1647,6 +1670,29 @@ def zpopmin(
16471670
RequestType.ZPopMin, [key, str(count)] if count else [key]
16481671
)
16491672

1673+
def bzpopmin(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
1674+
"""
1675+
Pops the member with the lowest score from the first non-empty sorted set, with the given keys being checked in
1676+
the order that they are given. Blocks the connection when there are no members to remove from any of the given
1677+
sorted sets.
1678+
1679+
`BZPOPMIN` is the blocking variant of `ZPOPMIN`.
1680+
1681+
`BZPOPMIN` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
1682+
1683+
See https://valkey.io/commands/bzpopmin for more details.
1684+
1685+
Args:
1686+
keys (List[str]): The keys of the sorted sets.
1687+
timeout (float): The number of seconds to wait for a blocking operation to complete.
1688+
A value of 0 will block indefinitely.
1689+
1690+
Command response:
1691+
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
1692+
and the member score. If no member could be popped and the `timeout` expired, returns None.
1693+
"""
1694+
return self.append_command(RequestType.BZPopMin, keys + [str(timeout)])
1695+
16501696
def zrange(
16511697
self: TTransaction,
16521698
key: str,

python/python/tests/test_async_client.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,6 +1833,46 @@ async def test_zpopmin(self, redis_client: TRedisClient):
18331833

18341834
assert await redis_client.zpopmin("non_exisitng_key") == {}
18351835

1836+
@pytest.mark.parametrize("cluster_mode", [True, False])
1837+
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
1838+
async def test_bzpopmin(self, redis_client: TRedisClient):
1839+
key1 = f"{{testKey}}:{get_random_string(10)}"
1840+
key2 = f"{{testKey}}:{get_random_string(10)}"
1841+
non_existing_key = f"{{testKey}}:non_existing_key"
1842+
1843+
assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2
1844+
assert await redis_client.zadd(key2, {"c": 2.0}) == 1
1845+
assert await redis_client.bzpopmin([key1, key2], 0.5) == [key1, "a", 1.0]
1846+
assert await redis_client.bzpopmin([non_existing_key, key2], 0.5) == [
1847+
key2,
1848+
"c",
1849+
2.0,
1850+
]
1851+
assert await redis_client.bzpopmin(["non_existing_key"], 0.5) is None
1852+
1853+
# invalid argument - key list must not be empty
1854+
with pytest.raises(RequestError):
1855+
await redis_client.bzpopmin([], 0.5)
1856+
1857+
# key exists, but it is not a sorted set
1858+
assert await redis_client.set("foo", "value") == OK
1859+
with pytest.raises(RequestError):
1860+
await redis_client.bzpopmin(["foo"], 0.5)
1861+
1862+
# same-slot requirement
1863+
if isinstance(redis_client, RedisClusterClient):
1864+
with pytest.raises(RequestError) as e:
1865+
await redis_client.bzpopmin(["abc", "zxy", "lkn"], 0.5)
1866+
assert "CrossSlot" in str(e)
1867+
1868+
async def endless_bzpopmin_call():
1869+
await redis_client.bzpopmin(["non_existent_key"], 0)
1870+
1871+
# bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
1872+
# avoid having the test block forever
1873+
with pytest.raises(asyncio.TimeoutError):
1874+
await asyncio.wait_for(endless_bzpopmin_call(), timeout=0.5)
1875+
18361876
@pytest.mark.parametrize("cluster_mode", [True, False])
18371877
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
18381878
async def test_zpopmax(self, redis_client: TRedisClient):
@@ -1852,6 +1892,46 @@ async def test_zpopmax(self, redis_client: TRedisClient):
18521892

18531893
assert await redis_client.zpopmax("non_exisitng_key") == {}
18541894

1895+
@pytest.mark.parametrize("cluster_mode", [True, False])
1896+
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
1897+
async def test_bzpopmax(self, redis_client: TRedisClient):
1898+
key1 = f"{{testKey}}:{get_random_string(10)}"
1899+
key2 = f"{{testKey}}:{get_random_string(10)}"
1900+
non_existing_key = f"{{testKey}}:non_existing_key"
1901+
1902+
assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2
1903+
assert await redis_client.zadd(key2, {"c": 2.0}) == 1
1904+
assert await redis_client.bzpopmax([key1, key2], 0.5) == [key1, "b", 1.5]
1905+
assert await redis_client.bzpopmax([non_existing_key, key2], 0.5) == [
1906+
key2,
1907+
"c",
1908+
2.0,
1909+
]
1910+
assert await redis_client.bzpopmax(["non_existing_key"], 0.5) is None
1911+
1912+
# invalid argument - key list must not be empty
1913+
with pytest.raises(RequestError):
1914+
await redis_client.bzpopmax([], 0.5)
1915+
1916+
# key exists, but it is not a sorted set
1917+
assert await redis_client.set("foo", "value") == OK
1918+
with pytest.raises(RequestError):
1919+
await redis_client.bzpopmax(["foo"], 0.5)
1920+
1921+
# same-slot requirement
1922+
if isinstance(redis_client, RedisClusterClient):
1923+
with pytest.raises(RequestError) as e:
1924+
await redis_client.bzpopmax(["abc", "zxy", "lkn"], 0.5)
1925+
assert "CrossSlot" in str(e)
1926+
1927+
async def endless_bzpopmax_call():
1928+
await redis_client.bzpopmax(["non_existent_key"], 0)
1929+
1930+
# bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
1931+
# avoid having the test block forever
1932+
with pytest.raises(asyncio.TimeoutError):
1933+
await asyncio.wait_for(endless_bzpopmax_call(), timeout=0.5)
1934+
18551935
@pytest.mark.parametrize("cluster_mode", [True, False])
18561936
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
18571937
async def test_zrange_by_index(self, redis_client: TRedisClient):

python/python/tests/test_transaction.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,16 @@ async def transaction_test(
228228
args.append([2.0, 3.0])
229229
transaction.zrangestore(key8, key8, RangeByIndex(0, -1))
230230
args.append(3)
231-
transaction.zpopmin(key8)
232-
args.append({"two": 2.0})
231+
transaction.bzpopmin([key8], 0.5)
232+
args.append([key8, "two", 2.0])
233+
transaction.bzpopmax([key8], 0.5)
234+
args.append([key8, "four", 4.0])
233235
transaction.zpopmax(key8)
234-
args.append({"four": 4})
236+
args.append({"three": 3.0})
237+
transaction.zpopmin(key8)
238+
args.append({})
235239
transaction.zremrangebyscore(key8, InfBound.NEG_INF, InfBound.POS_INF)
236-
args.append(1)
240+
args.append(0)
237241
transaction.zremrangebylex(key8, InfBound.NEG_INF, InfBound.POS_INF)
238242
args.append(0)
239243
transaction.zdiffstore(key8, [key8, key8])

0 commit comments

Comments
 (0)