From 7422db376d55294c4dd1a5d717bce22f52a89583 Mon Sep 17 00:00:00 2001 From: Leibale Date: Mon, 13 Feb 2023 13:05:02 -0500 Subject: [PATCH 1/3] fix #2406 - fix isolationPool after reconnect --- packages/client/lib/client/index.spec.ts | 6 ++++++ packages/client/lib/client/index.ts | 27 +++++++++++++----------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 25c966c2719..65d7d6ad2df 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -720,6 +720,12 @@ describe('Client', () => { ); }, GLOBAL.SERVERS.OPEN); + testUtils.testWithClient('should be able to use isolationPool after reconnect (#2406)', async client => { + await client.disconnect(); + await client.connect(); + await client.executeIsolated(() => Promise.resolve()); + }, GLOBAL.SERVERS.OPEN); + describe('PubSub', () => { testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => { function assertStringListener(message: string, channel: string) { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index b4bf49fc7bc..af15d63e4a3 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -15,7 +15,6 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '. import { URL } from 'url'; import { TcpSocketConnectOpts } from 'net'; import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub'; -import { callbackify } from 'util'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -190,7 +189,7 @@ export default class RedisClient< readonly #options?: RedisClientOptions; readonly #socket: RedisSocket; readonly #queue: RedisCommandsQueue; - readonly #isolationPool: Pool>; + #isolationPool?: Pool>; readonly #v4: Record = {}; #selectedDB = 0; @@ -223,16 +222,6 @@ export default class RedisClient< this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(); - this.#isolationPool = createPool({ - create: async () => { - const duplicate = this.duplicate({ - isolationPoolOptions: undefined - }).on('error', err => this.emit('error', err)); - await duplicate.connect(); - return duplicate; - }, - destroy: client => client.disconnect() - }, options?.isolationPoolOptions); this.#legacyMode(); } @@ -422,6 +411,16 @@ export default class RedisClient< } connect(): Promise { + this.#isolationPool = createPool({ + create: async () => { + const duplicate = this.duplicate({ + isolationPoolOptions: undefined + }).on('error', err => this.emit('error', err)); + await duplicate.connect(); + return duplicate; + }, + destroy: client => client.disconnect() + }, this.#options?.isolationPoolOptions); return this.#socket.connect(); } @@ -704,6 +703,7 @@ export default class RedisClient< } executeIsolated(fn: (client: RedisClientType) => T | Promise): Promise { + if (!this.#isolationPool) throw new ClientClosedError(); return this.#isolationPool.use(fn); } @@ -793,8 +793,11 @@ export default class RedisClient< } async #destroyIsolationPool(): Promise { + if (!this.#isolationPool) return; + await this.#isolationPool.drain(); await this.#isolationPool.clear(); + this.#isolationPool = undefined; } ref(): void { From 5b10b40270bc3cf7a4dd95563eafa775f4e033ed Mon Sep 17 00:00:00 2001 From: Leibale Date: Mon, 13 Feb 2023 13:35:32 -0500 Subject: [PATCH 2/3] revert breaking change --- packages/client/lib/client/index.spec.ts | 41 +++++++++++++++++------- packages/client/lib/client/index.ts | 28 ++++++++++------ 2 files changed, 48 insertions(+), 21 deletions(-) diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 65d7d6ad2df..7d8672f14b6 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -598,11 +598,36 @@ describe('Client', () => { } }); - testUtils.testWithClient('executeIsolated', async client => { - const id = await client.clientId(), - isolatedId = await client.executeIsolated(isolatedClient => isolatedClient.clientId()); - assert.ok(id !== isolatedId); - }, GLOBAL.SERVERS.OPEN); + describe('isolationPool', () => { + testUtils.testWithClient('executeIsolated', async client => { + const id = await client.clientId(), + isolatedId = await client.executeIsolated(isolatedClient => isolatedClient.clientId()); + assert.ok(id !== isolatedId); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should be able to use pool even before connect', async client => { + await client.executeIsolated(() => Promise.resolve()); + }, { + ...GLOBAL.SERVERS.OPEN, + disableClientSetup: true + }); + + testUtils.testWithClient('should work after reconnect (#2406)', async client => { + await client.disconnect(); + await client.connect(); + await client.executeIsolated(() => Promise.resolve()); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should throw ClientClosedError after disconnect', async client => { + assert.rejects( + client.executeIsolated(() => Promise.resolve()), + ClientClosedError + ); + }, { + ...GLOBAL.SERVERS.OPEN, + disableClientSetup: true + }); + }); async function killClient< M extends RedisModules, @@ -719,12 +744,6 @@ describe('Client', () => { members.map(member => [member.value, member.score]).sort(sort) ); }, GLOBAL.SERVERS.OPEN); - - testUtils.testWithClient('should be able to use isolationPool after reconnect (#2406)', async client => { - await client.disconnect(); - await client.connect(); - await client.executeIsolated(() => Promise.resolve()); - }, GLOBAL.SERVERS.OPEN); describe('PubSub', () => { testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index af15d63e4a3..666d47150f1 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -222,6 +222,9 @@ export default class RedisClient< this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(); + // should be initiated in connect, not here + // TODO: consider breaking in v5 + this.#isolationPool = this.#initiateIsolationPool(); this.#legacyMode(); } @@ -326,6 +329,19 @@ export default class RedisClient< .on('end', () => this.emit('end')); } + #initiateIsolationPool() { + return createPool({ + create: async () => { + const duplicate = this.duplicate({ + isolationPoolOptions: undefined + }).on('error', err => this.emit('error', err)); + await duplicate.connect(); + return duplicate; + }, + destroy: client => client.disconnect() + }, this.#options?.isolationPoolOptions); + } + #legacyMode(): void { if (!this.#options?.legacyMode) return; @@ -411,16 +427,8 @@ export default class RedisClient< } connect(): Promise { - this.#isolationPool = createPool({ - create: async () => { - const duplicate = this.duplicate({ - isolationPoolOptions: undefined - }).on('error', err => this.emit('error', err)); - await duplicate.connect(); - return duplicate; - }, - destroy: client => client.disconnect() - }, this.#options?.isolationPoolOptions); + // see comment in constructor + this.#isolationPool ??= this.#initiateIsolationPool(); return this.#socket.connect(); } From dcf57abfb9b459c7f6d0ebed12e52f1bacf27f9f Mon Sep 17 00:00:00 2001 From: Leibale Date: Mon, 13 Feb 2023 14:22:22 -0500 Subject: [PATCH 3/3] fix --- packages/client/lib/client/index.spec.ts | 7 ++++++- packages/client/lib/client/index.ts | 8 +++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 7d8672f14b6..da0e008d932 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -607,6 +607,9 @@ describe('Client', () => { testUtils.testWithClient('should be able to use pool even before connect', async client => { await client.executeIsolated(() => Promise.resolve()); + // make sure to destroy isolation pool + await client.connect(); + await client.disconnect(); }, { ...GLOBAL.SERVERS.OPEN, disableClientSetup: true @@ -619,7 +622,9 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('should throw ClientClosedError after disconnect', async client => { - assert.rejects( + await client.connect(); + await client.disconnect(); + await assert.rejects( client.executeIsolated(() => Promise.resolve()), ClientClosedError ); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 666d47150f1..0a71f6b59ba 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -711,7 +711,7 @@ export default class RedisClient< } executeIsolated(fn: (client: RedisClientType) => T | Promise): Promise { - if (!this.#isolationPool) throw new ClientClosedError(); + if (!this.#isolationPool) return Promise.reject(new ClientClosedError()); return this.#isolationPool.use(fn); } @@ -801,10 +801,8 @@ export default class RedisClient< } async #destroyIsolationPool(): Promise { - if (!this.#isolationPool) return; - - await this.#isolationPool.drain(); - await this.#isolationPool.clear(); + await this.#isolationPool!.drain(); + await this.#isolationPool!.clear(); this.#isolationPool = undefined; }