Skip to content

Commit 69a0d82

Browse files
Yury-Fridlyandcyip10
authored andcommitted
Node: Add XPENDING command. (valkey-io#2085)
* Add `XPENDING` command. Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
1 parent d563e4f commit 69a0d82

File tree

7 files changed

+327
-41
lines changed

7 files changed

+327
-41
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#### Changes
2+
* Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085))
23
* Node: Added HSCAN command ([#2098](https://github.com/valkey-io/valkey-glide/pull/2098/))
34
* Node: Added XINFO CONSUMERS command ([#2093](https://github.com/valkey-io/valkey-glide/pull/2093))
45
* Node: Added HRANDFIELD command ([#2096](https://github.com/valkey-io/valkey-glide/pull/2096))

node/npm/glide/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ function initialize() {
140140
StreamAddOptions,
141141
StreamReadOptions,
142142
StreamClaimOptions,
143+
StreamPendingOptions,
143144
ScriptOptions,
144145
ClosingError,
145146
ConfigurationError,
@@ -232,8 +233,9 @@ function initialize() {
232233
StreamGroupOptions,
233234
StreamTrimOptions,
234235
StreamAddOptions,
235-
StreamReadOptions,
236236
StreamClaimOptions,
237+
StreamReadOptions,
238+
StreamPendingOptions,
237239
ScriptOptions,
238240
ClosingError,
239241
ConfigurationError,

node/src/BaseClient.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import {
4848
StreamAddOptions,
4949
StreamClaimOptions,
5050
StreamGroupOptions,
51+
StreamPendingOptions,
5152
StreamReadOptions,
5253
StreamTrimOptions,
5354
ZAddOptions,
@@ -171,6 +172,7 @@ import {
171172
createXGroupCreateConsumer,
172173
createXGroupDelConsumer,
173174
createXLen,
175+
createXPending,
174176
createXRead,
175177
createXTrim,
176178
createZAdd,
@@ -3971,6 +3973,77 @@ export class BaseClient {
39713973
return this.createWritePromise(createXLen(key));
39723974
}
39733975

3976+
/**
3977+
* Returns stream message summary information for pending messages matching a given range of IDs.
3978+
*
3979+
* See https://valkey.io/commands/xpending/ for more details.
3980+
*
3981+
* @param key - The key of the stream.
3982+
* @param group - The consumer group name.
3983+
* @returns An `array` that includes the summary of the pending messages. See example for more details.
3984+
* @example
3985+
* ```typescript
3986+
* console.log(await client.xpending("my_stream", "my_group")); // Output:
3987+
* // [
3988+
* // 42, // The total number of pending messages
3989+
* // "1722643465939-0", // The smallest ID among the pending messages
3990+
* // "1722643484626-0", // The greatest ID among the pending messages
3991+
* // [ // A 2D-`array` of every consumer in the group
3992+
* // [ "consumer1", "10" ], // with at least one pending message, and the
3993+
* // [ "consumer2", "32" ], // number of pending messages it has
3994+
* // ]
3995+
* // ]
3996+
* ```
3997+
*/
3998+
public async xpending(
3999+
key: string,
4000+
group: string,
4001+
): Promise<[number, string, string, [string, number][]]> {
4002+
return this.createWritePromise(createXPending(key, group));
4003+
}
4004+
4005+
/**
4006+
* Returns an extended form of stream message information for pending messages matching a given range of IDs.
4007+
*
4008+
* See https://valkey.io/commands/xpending/ for more details.
4009+
*
4010+
* @param key - The key of the stream.
4011+
* @param group - The consumer group name.
4012+
* @param options - Additional options to filter entries, see {@link StreamPendingOptions}.
4013+
* @returns A 2D-`array` of 4-tuples containing extended message information. See example for more details.
4014+
*
4015+
* @example
4016+
* ```typescript
4017+
* console.log(await client.xpending("my_stream", "my_group"), {
4018+
* start: { value: "0-1", isInclusive: true },
4019+
* end: InfScoreBoundary.PositiveInfinity,
4020+
* count: 2,
4021+
* consumer: "consumer1"
4022+
* }); // Output:
4023+
* // [
4024+
* // [
4025+
* // "1722643465939-0", // The ID of the message
4026+
* // "consumer1", // The name of the consumer that fetched the message and has still to acknowledge it
4027+
* // 174431, // The number of milliseconds that elapsed since the last time this message was delivered to this consumer
4028+
* // 1 // The number of times this message was delivered
4029+
* // ],
4030+
* // [
4031+
* // "1722643484626-0",
4032+
* // "consumer1",
4033+
* // 202231,
4034+
* // 1
4035+
* // ]
4036+
* // ]
4037+
* ```
4038+
*/
4039+
public async xpendingWithOptions(
4040+
key: string,
4041+
group: string,
4042+
options: StreamPendingOptions,
4043+
): Promise<[string, string, number, number][]> {
4044+
return this.createWritePromise(createXPending(key, group, options));
4045+
}
4046+
39744047
/**
39754048
* Returns the list of all consumers and their attributes for the given consumer group of the
39764049
* stream stored at `key`.

node/src/Commands.ts

Lines changed: 98 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1645,37 +1645,52 @@ type SortedSetRange<T> = {
16451645
export type RangeByScore = SortedSetRange<number> & { type: "byScore" };
16461646
export type RangeByLex = SortedSetRange<string> & { type: "byLex" };
16471647

1648-
/**
1649-
* Returns a string representation of a score boundary in Redis protocol format.
1650-
* @param score - The score boundary object containing value and inclusivity
1651-
* information.
1652-
* @param isLex - Indicates whether to return lexical representation for
1653-
* positive/negative infinity.
1654-
* @returns A string representation of the score boundary in Redis protocol
1655-
* format.
1656-
*/
1648+
/** Returns a string representation of a score boundary as a command argument. */
16571649
function getScoreBoundaryArg(
16581650
score: ScoreBoundary<number> | ScoreBoundary<string>,
1659-
isLex: boolean = false,
16601651
): string {
1661-
if (score == InfScoreBoundary.PositiveInfinity) {
1662-
return (
1663-
InfScoreBoundary.PositiveInfinity.toString() + (isLex ? "" : "inf")
1664-
);
1652+
if (typeof score === "string") {
1653+
// InfScoreBoundary
1654+
return score + "inf";
16651655
}
16661656

1667-
if (score == InfScoreBoundary.NegativeInfinity) {
1668-
return (
1669-
InfScoreBoundary.NegativeInfinity.toString() + (isLex ? "" : "inf")
1670-
);
1657+
if (score.isInclusive == false) {
1658+
return "(" + score.value.toString();
1659+
}
1660+
1661+
return score.value.toString();
1662+
}
1663+
1664+
/** Returns a string representation of a lex boundary as a command argument. */
1665+
function getLexBoundaryArg(
1666+
score: ScoreBoundary<number> | ScoreBoundary<string>,
1667+
): string {
1668+
if (typeof score === "string") {
1669+
// InfScoreBoundary
1670+
return score;
1671+
}
1672+
1673+
if (score.isInclusive == false) {
1674+
return "(" + score.value.toString();
1675+
}
1676+
1677+
return "[" + score.value.toString();
1678+
}
1679+
1680+
/** Returns a string representation of a stream boundary as a command argument. */
1681+
function getStreamBoundaryArg(
1682+
score: ScoreBoundary<number> | ScoreBoundary<string>,
1683+
): string {
1684+
if (typeof score === "string") {
1685+
// InfScoreBoundary
1686+
return score;
16711687
}
16721688

16731689
if (score.isInclusive == false) {
16741690
return "(" + score.value.toString();
16751691
}
16761692

1677-
const value = isLex ? "[" + score.value.toString() : score.value.toString();
1678-
return value;
1693+
return score.value.toString();
16791694
}
16801695

16811696
function createZRangeArgs(
@@ -1688,10 +1703,20 @@ function createZRangeArgs(
16881703

16891704
if (typeof rangeQuery.start != "number") {
16901705
rangeQuery = rangeQuery as RangeByScore | RangeByLex;
1691-
const isLex = rangeQuery.type == "byLex";
1692-
args.push(getScoreBoundaryArg(rangeQuery.start, isLex));
1693-
args.push(getScoreBoundaryArg(rangeQuery.stop, isLex));
1694-
args.push(isLex == true ? "BYLEX" : "BYSCORE");
1706+
1707+
if (rangeQuery.type == "byLex") {
1708+
args.push(
1709+
getLexBoundaryArg(rangeQuery.start),
1710+
getLexBoundaryArg(rangeQuery.stop),
1711+
"BYLEX",
1712+
);
1713+
} else {
1714+
args.push(
1715+
getScoreBoundaryArg(rangeQuery.start),
1716+
getScoreBoundaryArg(rangeQuery.stop),
1717+
"BYSCORE",
1718+
);
1719+
}
16951720
} else {
16961721
args.push(rangeQuery.start.toString());
16971722
args.push(rangeQuery.stop.toString());
@@ -1724,9 +1749,11 @@ export function createZCount(
17241749
minScore: ScoreBoundary<number>,
17251750
maxScore: ScoreBoundary<number>,
17261751
): command_request.Command {
1727-
const args = [key];
1728-
args.push(getScoreBoundaryArg(minScore));
1729-
args.push(getScoreBoundaryArg(maxScore));
1752+
const args = [
1753+
key,
1754+
getScoreBoundaryArg(minScore),
1755+
getScoreBoundaryArg(maxScore),
1756+
];
17301757
return createCommand(RequestType.ZCount, args);
17311758
}
17321759

@@ -1879,11 +1906,7 @@ export function createZRemRangeByLex(
18791906
minLex: ScoreBoundary<string>,
18801907
maxLex: ScoreBoundary<string>,
18811908
): command_request.Command {
1882-
const args = [
1883-
key,
1884-
getScoreBoundaryArg(minLex, true),
1885-
getScoreBoundaryArg(maxLex, true),
1886-
];
1909+
const args = [key, getLexBoundaryArg(minLex), getLexBoundaryArg(maxLex)];
18871910
return createCommand(RequestType.ZRemRangeByLex, args);
18881911
}
18891912

@@ -1895,12 +1918,15 @@ export function createZRemRangeByScore(
18951918
minScore: ScoreBoundary<number>,
18961919
maxScore: ScoreBoundary<number>,
18971920
): command_request.Command {
1898-
const args = [key];
1899-
args.push(getScoreBoundaryArg(minScore));
1900-
args.push(getScoreBoundaryArg(maxScore));
1921+
const args = [
1922+
key,
1923+
getScoreBoundaryArg(minScore),
1924+
getScoreBoundaryArg(maxScore),
1925+
];
19011926
return createCommand(RequestType.ZRemRangeByScore, args);
19021927
}
19031928

1929+
/** @internal */
19041930
export function createPersist(key: string): command_request.Command {
19051931
return createCommand(RequestType.Persist, [key]);
19061932
}
@@ -1913,11 +1939,7 @@ export function createZLexCount(
19131939
minLex: ScoreBoundary<string>,
19141940
maxLex: ScoreBoundary<string>,
19151941
): command_request.Command {
1916-
const args = [
1917-
key,
1918-
getScoreBoundaryArg(minLex, true),
1919-
getScoreBoundaryArg(maxLex, true),
1920-
];
1942+
const args = [key, getLexBoundaryArg(minLex), getLexBoundaryArg(maxLex)];
19211943
return createCommand(RequestType.ZLexCount, args);
19221944
}
19231945

@@ -2434,6 +2456,42 @@ export function createXLen(key: string): command_request.Command {
24342456
return createCommand(RequestType.XLen, [key]);
24352457
}
24362458

2459+
/** Optional arguments for {@link BaseClient.xpendingWithOptions|xpending}. */
2460+
export type StreamPendingOptions = {
2461+
/** Filter pending entries by their idle time - in milliseconds */
2462+
minIdleTime?: number;
2463+
/** Starting stream ID bound for range. */
2464+
start: ScoreBoundary<string>;
2465+
/** Ending stream ID bound for range. */
2466+
end: ScoreBoundary<string>;
2467+
/** Limit the number of messages returned. */
2468+
count: number;
2469+
/** Filter pending entries by consumer. */
2470+
consumer?: string;
2471+
};
2472+
2473+
/** @internal */
2474+
export function createXPending(
2475+
key: string,
2476+
group: string,
2477+
options?: StreamPendingOptions,
2478+
): command_request.Command {
2479+
const args = [key, group];
2480+
2481+
if (options) {
2482+
if (options.minIdleTime !== undefined)
2483+
args.push("IDLE", options.minIdleTime.toString());
2484+
args.push(
2485+
getStreamBoundaryArg(options.start),
2486+
getStreamBoundaryArg(options.end),
2487+
options.count.toString(),
2488+
);
2489+
if (options.consumer) args.push(options.consumer);
2490+
}
2491+
2492+
return createCommand(RequestType.XPending, args);
2493+
}
2494+
24372495
/** @internal */
24382496
export function createXInfoConsumers(
24392497
key: string,

node/src/Transaction.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import {
5454
StreamAddOptions,
5555
StreamClaimOptions,
5656
StreamGroupOptions,
57+
StreamPendingOptions,
5758
StreamReadOptions,
5859
StreamTrimOptions,
5960
ZAddOptions,
@@ -206,6 +207,7 @@ import {
206207
createXInfoConsumers,
207208
createXInfoStream,
208209
createXLen,
210+
createXPending,
209211
createXRead,
210212
createXTrim,
211213
createZAdd,
@@ -2332,6 +2334,9 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
23322334
}
23332335

23342336
/**
2337+
* Returns stream message summary information for pending messages matching a given range of IDs.
2338+
*
2339+
* See https://valkey.io/commands/xpending/ for more details.
23352340
* Returns the list of all consumers and their attributes for the given consumer group of the
23362341
* stream stored at `key`.
23372342
*
@@ -2340,6 +2345,39 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
23402345
* @param key - The key of the stream.
23412346
* @param group - The consumer group name.
23422347
*
2348+
* Command Response - An `array` that includes the summary of the pending messages.
2349+
* See example of {@link BaseClient.xpending|xpending} for more details.
2350+
*/
2351+
public xpending(key: string, group: string): T {
2352+
return this.addAndReturn(createXPending(key, group));
2353+
}
2354+
2355+
/**
2356+
* Returns stream message summary information for pending messages matching a given range of IDs.
2357+
*
2358+
* See https://valkey.io/commands/xpending/ for more details.
2359+
*
2360+
* @param key - The key of the stream.
2361+
* @param group - The consumer group name.
2362+
* @param options - Additional options to filter entries, see {@link StreamPendingOptions}.
2363+
*
2364+
* Command Response - A 2D-`array` of 4-tuples containing extended message information.
2365+
* See example of {@link BaseClient.xpendingWithOptions|xpendingWithOptions} for more details.
2366+
*/
2367+
public xpendingWithOptions(
2368+
key: string,
2369+
group: string,
2370+
options: StreamPendingOptions,
2371+
): T {
2372+
return this.addAndReturn(createXPending(key, group, options));
2373+
}
2374+
2375+
/**
2376+
* Returns the list of all consumers and their attributes for the given consumer group of the
2377+
* stream stored at `key`.
2378+
*
2379+
* See https://valkey.io/commands/xinfo-consumers/ for more details.
2380+
*
23432381
* Command Response - An `Array` of `Records`, where each mapping contains the attributes
23442382
* of a consumer for the given consumer group of the stream at `key`.
23452383
*/

0 commit comments

Comments
 (0)