Skip to content

Commit b3b7dc7

Browse files
authored
Merge branch 'main' into python/integ_yipin_flushdb
2 parents 7417c06 + e83f9da commit b3b7dc7

File tree

7 files changed

+411
-17
lines changed

7 files changed

+411
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
* Python: Added XGROUP CREATE and XGROUP DESTROY commands ([#1646](https://github.com/aws/glide-for-redis/pull/1646))
5656
* Python: Added XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands ([#1658](https://github.com/aws/glide-for-redis/pull/1658))
5757
* Python: Added LOLWUT command ([#1657](https://github.com/aws/glide-for-redis/pull/1657))
58+
* Python: Added XREADGROUP command ([#1679](https://github.com/aws/glide-for-redis/pull/1679))
5859
* Python: Added FLUSHDB command ([#1680](https://github.com/aws/glide-for-redis/pull/1680))
5960

6061
### Breaking Changes

python/python/glide/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
StreamAddOptions,
5555
StreamGroupOptions,
5656
StreamRangeBound,
57+
StreamReadGroupOptions,
5758
StreamReadOptions,
5859
StreamTrimOptions,
5960
TrimByMaxLen,
@@ -161,6 +162,7 @@
161162
"MinId",
162163
"StreamAddOptions",
163164
"StreamGroupOptions",
165+
"StreamReadGroupOptions",
164166
"StreamRangeBound",
165167
"StreamReadOptions",
166168
"StreamTrimOptions",

python/python/glide/async_commands/core.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
StreamAddOptions,
5050
StreamGroupOptions,
5151
StreamRangeBound,
52+
StreamReadGroupOptions,
5253
StreamReadOptions,
5354
StreamTrimOptions,
5455
)
@@ -2916,6 +2917,57 @@ async def xgroup_del_consumer(
29162917
),
29172918
)
29182919

2920+
async def xreadgroup(
2921+
self,
2922+
keys_and_ids: Mapping[str, str],
2923+
group_name: str,
2924+
consumer_name: str,
2925+
options: Optional[StreamReadGroupOptions] = None,
2926+
) -> Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]]:
2927+
"""
2928+
Reads entries from the given streams owned by a consumer group.
2929+
2930+
See https://valkey.io/commands/xreadgroup for more details.
2931+
2932+
Note:
2933+
When in cluster mode, all keys in `keys_and_ids` must map to the same hash slot.
2934+
2935+
Args:
2936+
keys_and_ids (Mapping[str, str]): A mapping of stream keys to stream entry IDs to read from. The special ">"
2937+
ID returns messages that were never delivered to any other consumer. Any other valid ID will return
2938+
entries pending for the consumer with IDs greater than the one provided.
2939+
group_name (str): The consumer group name.
2940+
consumer_name (str): The consumer name. The consumer will be auto-created if it does not already exist.
2941+
options (Optional[StreamReadGroupOptions]): Options detailing how to read the stream.
2942+
2943+
Returns:
2944+
Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]]: A mapping of stream keys, to a mapping of
2945+
stream IDs, to a list of pairings with format `[[field, entry], [field, entry], ...]`.
2946+
Returns None if the BLOCK option is given and a timeout occurs, or if there is no stream that can be served.
2947+
2948+
Examples:
2949+
>>> await client.xadd("mystream", [("field1", "value1")], StreamAddOptions(id="1-0"))
2950+
>>> await client.xgroup_create("mystream", "mygroup", "0-0")
2951+
>>> await client.xreadgroup({"mystream": ">"}, "mygroup", "myconsumer", StreamReadGroupOptions(count=1))
2952+
{
2953+
"mystream": {
2954+
"1-0": [["field1", "value1"]],
2955+
}
2956+
} # Read one stream entry from "mystream" using "myconsumer" in the consumer group "mygroup".
2957+
"""
2958+
args = ["GROUP", group_name, consumer_name]
2959+
if options is not None:
2960+
args.extend(options.to_args())
2961+
2962+
args.append("STREAMS")
2963+
args.extend([key for key in keys_and_ids.keys()])
2964+
args.extend([value for value in keys_and_ids.values()])
2965+
2966+
return cast(
2967+
Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]],
2968+
await self._execute_command(RequestType.XReadGroup, args),
2969+
)
2970+
29192971
async def geoadd(
29202972
self,
29212973
key: str,

python/python/glide/async_commands/stream.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,3 +302,37 @@ def to_args(self) -> List[str]:
302302
args.extend([self.ENTRIES_READ_REDIS_API, self.entries_read_id])
303303

304304
return args
305+
306+
307+
class StreamReadGroupOptions(StreamReadOptions):
308+
READ_NOACK_REDIS_API = "NOACK"
309+
310+
def __init__(
311+
self, no_ack=False, block_ms: Optional[int] = None, count: Optional[int] = None
312+
):
313+
"""
314+
Options for reading entries from streams using a consumer group. Can be used as an optional argument to
315+
`XREADGROUP`.
316+
317+
Args:
318+
no_ack (bool): If set, messages are not added to the Pending Entries List (PEL). This is equivalent to
319+
acknowledging the message when it is read. Equivalent to `NOACK` in the Redis API.
320+
block_ms (Optional[int]): If provided, the request will be blocked for the set amount of milliseconds or
321+
until the server has the required number of entries. Equivalent to `BLOCK` in the Redis API.
322+
count (Optional[int]): The maximum number of elements requested. Equivalent to `COUNT` in the Redis API.
323+
"""
324+
super().__init__(block_ms=block_ms, count=count)
325+
self.no_ack = no_ack
326+
327+
def to_args(self) -> List[str]:
328+
"""
329+
Returns the options as a list of string arguments to be used in the `XREADGROUP` command.
330+
331+
Returns:
332+
List[str]: The options as a list of arguments for the `XREADGROUP` command.
333+
"""
334+
args = super().to_args()
335+
if self.no_ack:
336+
args.append(self.READ_NOACK_REDIS_API)
337+
338+
return args

python/python/glide/async_commands/transaction.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
StreamAddOptions,
4747
StreamGroupOptions,
4848
StreamRangeBound,
49+
StreamReadGroupOptions,
4950
StreamReadOptions,
5051
StreamTrimOptions,
5152
)
@@ -2039,6 +2040,41 @@ def xgroup_del_consumer(
20392040
RequestType.XGroupDelConsumer, [key, group_name, consumer_name]
20402041
)
20412042

2043+
def xreadgroup(
2044+
self: TTransaction,
2045+
keys_and_ids: Mapping[str, str],
2046+
group_name: str,
2047+
consumer_name: str,
2048+
options: Optional[StreamReadGroupOptions] = None,
2049+
) -> TTransaction:
2050+
"""
2051+
Reads entries from the given streams owned by a consumer group.
2052+
2053+
See https://valkey.io/commands/xreadgroup for more details.
2054+
2055+
Args:
2056+
keys_and_ids (Mapping[str, str]): A mapping of stream keys to stream entry IDs to read from. The special ">"
2057+
ID returns messages that were never delivered to any other consumer. Any other valid ID will return
2058+
entries pending for the consumer with IDs greater than the one provided.
2059+
group_name (str): The consumer group name.
2060+
consumer_name (str): The consumer name. The consumer will be auto-created if it does not already exist.
2061+
options (Optional[StreamReadGroupOptions]): Options detailing how to read the stream.
2062+
2063+
Command response:
2064+
Optional[Mapping[str, Mapping[str, Optional[List[List[str]]]]]]: A mapping of stream keys, to a mapping of
2065+
stream IDs, to a list of pairings with format `[[field, entry], [field, entry], ...]`.
2066+
Returns None if the BLOCK option is given and a timeout occurs, or if there is no stream that can be served.
2067+
"""
2068+
args = ["GROUP", group_name, consumer_name]
2069+
if options is not None:
2070+
args.extend(options.to_args())
2071+
2072+
args.append("STREAMS")
2073+
args.extend([key for key in keys_and_ids.keys()])
2074+
args.extend([value for value in keys_and_ids.values()])
2075+
2076+
return self.append_command(RequestType.XReadGroup, args)
2077+
20422078
def geoadd(
20432079
self: TTransaction,
20442080
key: str,

0 commit comments

Comments
 (0)