diff --git a/src/test.util.spec.ts b/src/test.util.spec.ts index 4f0bec8..4334906 100644 --- a/src/test.util.spec.ts +++ b/src/test.util.spec.ts @@ -10,6 +10,18 @@ const chance = new Chance(seed) const debug = mkDebug('test-redis-x-stream') const delay = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)), + withHandledRejection = (promise: Promise): Promise => { + void promise.catch(() => {}) + return promise + }, + setTimeoutAsync = (work: () => Promise | T, ms = 0): Promise => + withHandledRejection( + new Promise((resolve, reject) => { + setTimeout(() => { + Promise.resolve().then(work).then(resolve, reject) + }, ms) + }), + ), times = (count: number, fn: (_: undefined, i: number) => T): Array => Array.from(Array(count), fn) as T[], quit = async (client: RedisClient): Promise => { @@ -44,4 +56,16 @@ afterAll(() => { console.log(`Seed set to: ${seed}`) }) -export { delay, times, quit, hydrateForTest, rand, randNum, testEntries, redisIdRegex, drain } +export { + delay, + withHandledRejection, + setTimeoutAsync, + times, + quit, + hydrateForTest, + rand, + randNum, + testEntries, + redisIdRegex, + drain, +} diff --git a/src/xread.spec.ts b/src/xread.spec.ts index bd17ec6..7a11848 100644 --- a/src/xread.spec.ts +++ b/src/xread.spec.ts @@ -4,9 +4,11 @@ import { delay, hydrateForTest, quit, + setTimeoutAsync, testEntries, redisIdRegex, randNum, + withHandledRejection, } from './test.util.spec.js' import { RedisStream } from './stream.js' import { RedisClient } from './types.js' @@ -53,6 +55,7 @@ describe('redis-x-stream xread', () => { it('should block waiting for new entries', async () => { let entries = 0 + let laterHydrate: Promise | undefined const streamName = key('my-stream'), block = 200, iterable = new RedisStream({ @@ -64,9 +67,10 @@ describe('redis-x-stream xread', () => { await hydrate() for await (const _ of iterable) { if (entries++ === testEntries.length - 1) { - delay(block - 20).then(hydrate) + laterHydrate = withHandledRejection(delay(block - 20).then(() => hydrate())) } } + await laterHydrate expect(entries).toEqual(testEntries.length * 2) }) @@ -101,25 +105,29 @@ describe('redis-x-stream xread', () => { count: randNum(300, 400), }) let i = 0 + let addStreamLater: Promise | undefined + let quitLater: Promise | undefined for await (const [streamName, _] of stream) { i++ if (i === testEntries.length) { expect(streamName).toEqual(myStream) - setTimeout(() => { + addStreamLater = setTimeoutAsync(() => { expect(stream.reading).toBe(true) - stream.addStream(laterStream) + return stream.addStream(laterStream) }) } if (i > testEntries.length) { expect(streamName).toEqual(laterStream) } if (i === testEntries.length * 2 - 1) { - setTimeout(() => { + quitLater = setTimeoutAsync(() => { i++ - stream.quit() //break; + return stream.quit() //break; }, 100) } } + await addStreamLater + await quitLater //stream will block indefinitely (i++ in the future to assert after loop) expect(i).toEqual(testEntries.length * 2 + 1) }) diff --git a/src/xreadgroup.spec.ts b/src/xreadgroup.spec.ts index d874812..cd00128 100644 --- a/src/xreadgroup.spec.ts +++ b/src/xreadgroup.spec.ts @@ -8,7 +8,9 @@ import { quit, rand, redisIdRegex, + setTimeoutAsync, testEntries, + withHandledRejection, } from './test.util.spec.js' import redisStream, { RedisStream } from './stream.js' import { RedisClient } from './types.js' @@ -324,25 +326,29 @@ describe('redis-x-stream xreadgroup', () => { ackOnIterate: true, }) let i = 0 + let addStreamLater: Promise | undefined + let quitLater: Promise | undefined for await (const [streamName, _] of stream) { i++ if (i === testEntries.length) { expect(streamName).toEqual(myStream) - setTimeout(() => { + addStreamLater = setTimeoutAsync(() => { expect(stream.reading).toBe(true) - stream.addStream(laterStream) + return stream.addStream(laterStream) }) } if (i > testEntries.length) { expect(streamName).toEqual(laterStream) } if (i === testEntries.length * 2 - 1) { - setTimeout(() => { + quitLater = setTimeoutAsync(() => { i++ - stream.quit() + return stream.quit() }, 100) } } + await addStreamLater + await quitLater expect(i).toEqual(testEntries.length * 2 + 1) }) @@ -361,27 +367,31 @@ describe('redis-x-stream xreadgroup', () => { }) let i = 0 let laterWrites: Promise | undefined + let quitLater: Promise | undefined for await (const [streamName, _] of stream) { i++ if (i === testEntries.length) { expect(streamName).toEqual(myStream) - laterWrites = (async () => { - await waitFor(() => stream.reading) - await stream.addStream({ [laterStream]: '$' }) - await hydrateForTest(writer, laterStream) - })() + laterWrites = withHandledRejection( + (async () => { + await waitFor(() => stream.reading) + await stream.addStream({ [laterStream]: '$' }) + await hydrateForTest(writer, laterStream) + })(), + ) } if (i > testEntries.length) { expect(streamName).toEqual(laterStream) } if (i === testEntries.length * 2 - 1) { - setTimeout(() => { + quitLater = setTimeoutAsync(() => { i++ - stream.quit() + return stream.quit() }, 100) } } await laterWrites + await quitLater // Got all entries from myStream + only newly-written entries from laterStream expect(i).toEqual(testEntries.length * 2 + 1) }) @@ -399,11 +409,13 @@ describe('redis-x-stream xreadgroup', () => { ackOnIterate: true, }) - const iterator = (async () => { - for await (const _ of stream) { - void _ - } - })() + const iterator = withHandledRejection( + (async () => { + for await (const _ of stream) { + void _ + } + })(), + ) await waitFor(() => stream.reading) await stream.addStream({ [laterStream]: '$' }) @@ -442,25 +454,29 @@ describe('redis-x-stream xreadgroup', () => { ackOnIterate: true, }) let i = 0 + let addStreamLater: Promise | undefined + let quitLater: Promise | undefined for await (const [streamName, _] of stream) { i++ if (i === testEntries.length) { expect(streamName).toEqual(myStream) - setTimeout(() => { + addStreamLater = setTimeoutAsync(() => { expect(stream.reading).toBe(true) - stream.addStream(laterStream) + return stream.addStream(laterStream) }) } if (i > testEntries.length) { expect(streamName).toEqual(laterStream) } if (i === testEntries.length * 2 - 1) { - setTimeout(() => { + quitLater = setTimeoutAsync(() => { i++ - stream.quit() + return stream.quit() }, 100) } } + await addStreamLater + await quitLater expect(i).toEqual(testEntries.length * 2 + 1) }) @@ -491,25 +507,29 @@ describe('redis-x-stream xreadgroup', () => { ackOnIterate: true, }) let i = 0 + let addStreamLater: Promise | undefined + let quitLater: Promise | undefined for await (const [streamName, _] of stream) { i++ if (i === testEntries.length) { expect(streamName).toEqual(myStream) - setTimeout(() => { + addStreamLater = setTimeoutAsync(() => { expect(stream.reading).toBe(true) - stream.addStream(laterStream) + return stream.addStream(laterStream) }) } if (i > testEntries.length) { expect(streamName).toEqual(laterStream) } if (i === testEntries.length * 2 - 1) { - setTimeout(() => { + quitLater = setTimeoutAsync(() => { i++ - stream.quit() + return stream.quit() }, 100) } } + await addStreamLater + await quitLater // All entries from both streams, despite count=3 << PEL size expect(i).toEqual(testEntries.length * 2 + 1) })