Skip to content

Commit 2dba7f2

Browse files
authored
fix #2392 - handle errors in legacyMode (#2394)
1 parent 00e3652 commit 2dba7f2

File tree

2 files changed

+36
-173
lines changed

2 files changed

+36
-173
lines changed

packages/client/lib/client/index.spec.ts

+25-165
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,14 @@ import { strict as assert } from 'assert';
22
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
33
import RedisClient, { RedisClientType } from '.';
44
import { RedisClientMultiCommandType } from './multi-command';
5-
import { RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands';
6-
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors';
5+
import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands';
6+
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
77
import { defineScript } from '../lua-script';
88
import { spy } from 'sinon';
99
import { once } from 'events';
1010
import { ClientKillFilters } from '../commands/CLIENT_KILL';
11-
import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT';
1211
import { promisify } from 'util';
1312

14-
// We need to use 'require', because it's not possible with Typescript to import
15-
// function that are exported as 'module.exports = function`, without esModuleInterop
16-
// set to true.
17-
const calculateSlot = require('cluster-key-slot');
18-
1913
export const SQUARE_SCRIPT = defineScript({
2014
SCRIPT: 'return ARGV[1] * ARGV[1];',
2115
NUMBER_OF_KEYS: 0,
@@ -171,6 +165,28 @@ describe('Client', () => {
171165
}
172166
});
173167

168+
testUtils.testWithClient('client.sendCommand should reply with error', async client => {
169+
await assert.rejects(
170+
promisify(client.sendCommand).call(client, '1', '2')
171+
);
172+
}, {
173+
...GLOBAL.SERVERS.OPEN,
174+
clientOptions: {
175+
legacyMode: true
176+
}
177+
});
178+
179+
testUtils.testWithClient('client.hGetAll should reply with error', async client => {
180+
await assert.rejects(
181+
promisify(client.hGetAll).call(client)
182+
);
183+
}, {
184+
...GLOBAL.SERVERS.OPEN,
185+
clientOptions: {
186+
legacyMode: true
187+
}
188+
});
189+
174190
testUtils.testWithClient('client.v4.sendCommand should return a promise', async client => {
175191
assert.equal(
176192
await client.v4.sendCommand(['PING']),
@@ -347,19 +363,6 @@ describe('Client', () => {
347363
legacyMode: true
348364
}
349365
});
350-
351-
testUtils.testWithClient('pingInterval', async client => {
352-
assert.deepEqual(
353-
await once(client, 'ping-interval'),
354-
['PONG']
355-
);
356-
}, {
357-
...GLOBAL.SERVERS.OPEN,
358-
clientOptions: {
359-
legacyMode: true,
360-
pingInterval: 1
361-
}
362-
});
363366
});
364367

365368
describe('events', () => {
@@ -823,34 +826,7 @@ describe('Client', () => {
823826
}
824827
}, GLOBAL.SERVERS.OPEN);
825828

826-
testUtils.testWithClient('should be able to PING in PubSub mode', async client => {
827-
await client.connect();
828-
829-
try {
830-
await client.subscribe('channel', () => {
831-
// noop
832-
});
833-
834-
const [string, buffer, customString, customBuffer] = await Promise.all([
835-
client.ping(),
836-
client.ping(client.commandOptions({ returnBuffers: true })),
837-
client.ping('custom'),
838-
client.ping(client.commandOptions({ returnBuffers: true }), 'custom')
839-
]);
840-
841-
assert.equal(string, 'pong');
842-
assert.deepEqual(buffer, Buffer.from('pong'));
843-
assert.equal(customString, 'custom');
844-
assert.deepEqual(customBuffer, Buffer.from('custom'));
845-
} finally {
846-
await client.disconnect();
847-
}
848-
}, {
849-
...GLOBAL.SERVERS.OPEN,
850-
disableClientSetup: true
851-
});
852-
853-
testUtils.testWithClient('should be able to QUIT in PubSub mode', async client => {
829+
testUtils.testWithClient('should be able to quit in PubSub mode', async client => {
854830
await client.subscribe('channel', () => {
855831
// noop
856832
});
@@ -859,122 +835,6 @@ describe('Client', () => {
859835

860836
assert.equal(client.isOpen, false);
861837
}, GLOBAL.SERVERS.OPEN);
862-
863-
testUtils.testWithClient('should reject GET in PubSub mode', async client => {
864-
await client.connect();
865-
866-
try {
867-
await client.subscribe('channel', () => {
868-
// noop
869-
});
870-
871-
await assert.rejects(client.get('key'), ErrorReply);
872-
} finally {
873-
await client.disconnect();
874-
}
875-
}, {
876-
...GLOBAL.SERVERS.OPEN,
877-
disableClientSetup: true
878-
});
879-
880-
describe('shareded PubSub', () => {
881-
testUtils.isVersionGreaterThanHook([7]);
882-
883-
testUtils.testWithClient('should be able to receive messages', async publisher => {
884-
const subscriber = publisher.duplicate();
885-
886-
await subscriber.connect();
887-
888-
try {
889-
const listener = spy();
890-
await subscriber.sSubscribe('channel', listener);
891-
892-
await Promise.all([
893-
waitTillBeenCalled(listener),
894-
publisher.sPublish('channel', 'message')
895-
]);
896-
897-
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
898-
899-
await subscriber.sUnsubscribe();
900-
901-
// should be able to send commands
902-
await assert.doesNotReject(subscriber.ping());
903-
} finally {
904-
await subscriber.disconnect();
905-
}
906-
}, {
907-
...GLOBAL.SERVERS.OPEN
908-
});
909-
910-
testUtils.testWithClient('should emit sharded-channel-moved event', async publisher => {
911-
await publisher.clusterAddSlotsRange({ start: 0, end: 16383 });
912-
913-
const subscriber = publisher.duplicate();
914-
915-
await subscriber.connect();
916-
917-
try {
918-
await subscriber.sSubscribe('channel', () => {});
919-
920-
await Promise.all([
921-
publisher.clusterSetSlot(
922-
calculateSlot('channel'),
923-
ClusterSlotStates.NODE,
924-
await publisher.clusterMyId()
925-
),
926-
once(subscriber, 'sharded-channel-moved')
927-
]);
928-
929-
assert.equal(
930-
await subscriber.ping(),
931-
'PONG'
932-
);
933-
} finally {
934-
await subscriber.disconnect();
935-
}
936-
}, {
937-
serverArguments: ['--cluster-enabled', 'yes']
938-
});
939-
});
940-
941-
testUtils.testWithClient('should handle errors in SUBSCRIBE', async publisher => {
942-
const subscriber = publisher.duplicate();
943-
944-
await subscriber.connect();
945-
946-
try {
947-
const listener1 = spy();
948-
await subscriber.subscribe('1', listener1);
949-
950-
await publisher.aclSetUser('default', 'resetchannels');
951-
952-
953-
const listener2 = spy();
954-
await assert.rejects(subscriber.subscribe('2', listener2));
955-
956-
await Promise.all([
957-
waitTillBeenCalled(listener1),
958-
publisher.aclSetUser('default', 'allchannels'),
959-
publisher.publish('1', 'message'),
960-
]);
961-
assert.ok(listener1.calledOnceWithExactly('message', '1'));
962-
963-
await subscriber.subscribe('2', listener2);
964-
965-
await Promise.all([
966-
waitTillBeenCalled(listener2),
967-
publisher.publish('2', 'message'),
968-
]);
969-
assert.ok(listener2.calledOnceWithExactly('message', '2'));
970-
} finally {
971-
await subscriber.disconnect();
972-
}
973-
}, {
974-
// this test change ACL rules, running in isolated server
975-
serverArguments: [],
976-
minimumDockerVersion: [6 ,2] // ACL PubSub rules were added in Redis 6.2
977-
});
978838
});
979839

980840
testUtils.testWithClient('ConnectionTimeoutError', async client => {

packages/client/lib/client/index.ts

+11-8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '.
1515
import { URL } from 'url';
1616
import { TcpSocketConnectOpts } from 'net';
1717
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
18+
import { callbackify } from 'util';
1819

1920
export interface RedisClientOptions<
2021
M extends RedisModules = RedisModules,
@@ -343,7 +344,9 @@ export default class RedisClient<
343344
(this as any).sendCommand = (...args: Array<any>): void => {
344345
const result = this.#legacySendCommand(...args);
345346
if (result) {
346-
result.promise.then(reply => result.callback(null, reply));
347+
result.promise
348+
.then(reply => result.callback(null, reply))
349+
.catch(err => result.callback(err));
347350
}
348351
};
349352

@@ -380,18 +383,18 @@ export default class RedisClient<
380383
promise.catch(err => this.emit('error', err));
381384
}
382385

383-
#defineLegacyCommand(this: any, name: string, command?: RedisCommand): void {
384-
this.#v4[name] = this[name].bind(this);
385-
this[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ?
386+
#defineLegacyCommand(name: string, command?: RedisCommand): void {
387+
this.#v4[name] = (this as any)[name].bind(this);
388+
(this as any)[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ?
386389
(...args: Array<unknown>) => {
387390
const result = this.#legacySendCommand(name, ...args);
388391
if (result) {
389-
result.promise.then((reply: any) => {
390-
result.callback(null, command.transformReply!(reply));
391-
});
392+
result.promise
393+
.then(reply => result.callback(null, command.transformReply!(reply)))
394+
.catch(err => result.callback(err));
392395
}
393396
} :
394-
(...args: Array<unknown>) => this.sendCommand(name, ...args);
397+
(...args: Array<unknown>) => (this as any).sendCommand(name, ...args);
395398
}
396399

397400
#pingTimer?: NodeJS.Timer;

0 commit comments

Comments
 (0)