Skip to content

Commit eb2c37e

Browse files
committed
fix(client): XCLAIM & XAUTOCLAIM after a TRIM might return nils
1 parent a7d5bc7 commit eb2c37e

File tree

6 files changed

+158
-15
lines changed

6 files changed

+158
-15
lines changed

packages/client/lib/commands/XAUTOCLAIM.spec.ts

+63-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ describe('XAUTOCLAIM', () => {
2323
});
2424
});
2525

26-
testUtils.testWithClient('client.xAutoClaim', async client => {
26+
testUtils.testWithClient('client.xAutoClaim without messages', async client => {
2727
await Promise.all([
2828
client.xGroupCreate('key', 'group', '$', {
2929
MKSTREAM: true
@@ -39,4 +39,66 @@ describe('XAUTOCLAIM', () => {
3939
}
4040
);
4141
}, GLOBAL.SERVERS.OPEN);
42+
43+
testUtils.testWithClient('client.xAutoClaim with messages', async client => {
44+
const [,,id,] = await Promise.all([
45+
client.xGroupCreate('key', 'group', '$', {
46+
MKSTREAM: true
47+
}),
48+
client.xGroupCreateConsumer('key', 'group', 'consumer'),
49+
client.xAdd('key', '*', { foo: 'bar' }),
50+
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' })
51+
]);
52+
53+
assert.deepEqual(
54+
await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'),
55+
{
56+
nextId: '0-0',
57+
messages: [{
58+
id,
59+
message: Object.create(null, { 'foo': {
60+
value: 'bar',
61+
configurable: true,
62+
enumerable: true
63+
} })
64+
}]
65+
}
66+
);
67+
}, GLOBAL.SERVERS.OPEN);
68+
69+
testUtils.testWithClient('client.xAutoClaim with trimmed messages', async client => {
70+
const [,,,,,id2,] = await Promise.all([
71+
client.xGroupCreate('key', 'group', '$', {
72+
MKSTREAM: true
73+
}),
74+
client.xGroupCreateConsumer('key', 'group', 'consumer'),
75+
client.xAdd('key', '*', { foo: 'bar' }),
76+
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }),
77+
client.xTrim('key', 'MAXLEN', 0),
78+
client.xAdd('key', '*', { bar: 'baz' }),
79+
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }),
80+
]);
81+
82+
assert.deepEqual(
83+
await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'),
84+
{
85+
nextId: '0-0',
86+
messages: testUtils.isVersionGreaterThan([7, 0]) ? [{
87+
id: id2,
88+
message: Object.create(null, { 'bar': {
89+
value: 'baz',
90+
configurable: true,
91+
enumerable: true
92+
} })
93+
}] : [null, {
94+
id: id2,
95+
message: Object.create(null, { 'bar': {
96+
value: 'baz',
97+
configurable: true,
98+
enumerable: true
99+
} })
100+
}]
101+
}
102+
);
103+
}, GLOBAL.SERVERS.OPEN);
42104
});

packages/client/lib/commands/XAUTOCLAIM.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { RedisCommandArgument, RedisCommandArguments } from '.';
2-
import { StreamMessagesReply, transformStreamMessagesReply } from './generic-transformers';
2+
import { StreamMessagesNullReply, transformStreamMessagesNullReply } from './generic-transformers';
33

44
export const FIRST_KEY_INDEX = 1;
55

@@ -28,12 +28,12 @@ type XAutoClaimRawReply = [RedisCommandArgument, Array<any>];
2828

2929
interface XAutoClaimReply {
3030
nextId: RedisCommandArgument;
31-
messages: StreamMessagesReply;
31+
messages: StreamMessagesNullReply;
3232
}
3333

3434
export function transformReply(reply: XAutoClaimRawReply): XAutoClaimReply {
3535
return {
3636
nextId: reply[0],
37-
messages: transformStreamMessagesReply(reply[1])
37+
messages: transformStreamMessagesNullReply(reply[1])
3838
};
3939
}

packages/client/lib/commands/XCLAIM.spec.ts

+40
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,44 @@ describe('XCLAIM', () => {
8787
[]
8888
);
8989
}, GLOBAL.SERVERS.OPEN);
90+
91+
testUtils.testWithClient('client.xClaim with a message', async client => {
92+
const [,,id,] = await Promise.all([
93+
client.xGroupCreate('key', 'group', '$', {
94+
MKSTREAM: true
95+
}),
96+
client.xGroupCreateConsumer('key', 'group', 'consumer'),
97+
client.xAdd('key', '*', { foo: 'bar' }),
98+
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' })
99+
]);
100+
101+
assert.deepEqual(
102+
await client.xClaim('key', 'group', 'consumer', 1, id),
103+
[{
104+
id,
105+
message: Object.create(null, { 'foo': {
106+
value: 'bar',
107+
configurable: true,
108+
enumerable: true
109+
} })
110+
}]
111+
);
112+
}, GLOBAL.SERVERS.OPEN);
113+
114+
testUtils.testWithClient('client.xClaim with a trimmed message', async client => {
115+
const [,,id,,,] = await Promise.all([
116+
client.xGroupCreate('key', 'group', '$', {
117+
MKSTREAM: true
118+
}),
119+
client.xGroupCreateConsumer('key', 'group', 'consumer'),
120+
client.xAdd('key', '*', { foo: 'bar' }),
121+
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }),
122+
client.xTrim('key', 'MAXLEN', 0),
123+
]);
124+
125+
assert.deepEqual(
126+
await client.xClaim('key', 'group', 'consumer', 1, id),
127+
testUtils.isVersionGreaterThan([7, 0]) ? []: [null]
128+
);
129+
}, GLOBAL.SERVERS.OPEN);
90130
});

packages/client/lib/commands/XCLAIM.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,4 @@ export function transformArguments(
4545
return args;
4646
}
4747

48-
export { transformStreamMessagesReply as transformReply } from './generic-transformers';
48+
export { transformStreamMessagesNullReply as transformReply } from './generic-transformers';

packages/client/lib/commands/generic-transformers.spec.ts

+33
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
transformStringNumberInfinityArgument,
1010
transformTuplesReply,
1111
transformStreamMessagesReply,
12+
transformStreamMessagesNullReply,
1213
transformStreamsMessagesReply,
1314
transformSortedSetWithScoresReply,
1415
pushGeoCountArgument,
@@ -219,6 +220,38 @@ describe('Generic Transformers', () => {
219220
);
220221
});
221222

223+
it('transformStreamMessagesNullReply', () => {
224+
assert.deepEqual(
225+
transformStreamMessagesNullReply([null, ['0-0', ['0key', '0value']]]),
226+
[null, {
227+
id: '0-0',
228+
message: Object.create(null, {
229+
'0key': {
230+
value: '0value',
231+
configurable: true,
232+
enumerable: true
233+
}
234+
})
235+
}]
236+
);
237+
});
238+
239+
it('transformStreamMessagesNullReply', () => {
240+
assert.deepEqual(
241+
transformStreamMessagesNullReply([null, ['0-1', ['11key', '11value']]]),
242+
[null, {
243+
id: '0-1',
244+
message: Object.create(null, {
245+
'11key': {
246+
value: '11value',
247+
configurable: true,
248+
enumerable: true
249+
}
250+
})
251+
}]
252+
);
253+
});
254+
222255
describe('transformStreamsMessagesReply', () => {
223256
it('null', () => {
224257
assert.equal(

packages/client/lib/commands/generic-transformers.ts

+18-10
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,27 @@ export interface StreamMessageReply {
9292
message: Record<string, RedisCommandArgument>;
9393
}
9494

95-
export type StreamMessagesReply = Array<StreamMessageReply>;
95+
export function transformStreamMessageReply([id, message]: Array<any>): StreamMessageReply {
96+
return {
97+
id,
98+
message: transformTuplesReply(message)
99+
};
100+
}
96101

97-
export function transformStreamMessagesReply(reply: Array<any>): StreamMessagesReply {
98-
const messages = [];
102+
export function transformStreamMessageNullReply(reply: Array<any>): StreamMessageReply | null {
103+
if (reply === null) return null;
104+
return transformStreamMessageReply(reply);
105+
}
99106

100-
for (const [id, message] of reply) {
101-
messages.push({
102-
id,
103-
message: transformTuplesReply(message)
104-
});
105-
}
106107

107-
return messages;
108+
export type StreamMessagesReply = Array<StreamMessageReply>;
109+
export function transformStreamMessagesReply(reply: Array<any>): StreamMessagesReply {
110+
return reply.map(transformStreamMessageReply);
111+
}
112+
113+
export type StreamMessagesNullReply = Array<StreamMessageReply | null>;
114+
export function transformStreamMessagesNullReply(reply: Array<any>): StreamMessagesNullReply {
115+
return reply.map(transformStreamMessageNullReply);
108116
}
109117

110118
export type StreamsMessagesReply = Array<{

0 commit comments

Comments
 (0)