Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/test.util.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ const chance = new Chance(seed)
const debug = mkDebug('test-redis-x-stream')

const delay = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms)),
withHandledRejection = <T>(promise: Promise<T>): Promise<T> => {
void promise.catch(() => {})
return promise
},
setTimeoutAsync = <T>(work: () => Promise<T> | T, ms = 0): Promise<T> =>
withHandledRejection(
new Promise<T>((resolve, reject) => {
setTimeout(() => {
Promise.resolve().then(work).then(resolve, reject)
}, ms)
}),
),
times = <T>(count: number, fn: (_: undefined, i: number) => T): Array<T> =>
Array.from(Array(count), fn) as T[],
quit = async (client: RedisClient): Promise<void> => {
Expand Down Expand Up @@ -44,4 +56,16 @@ afterAll(() => {
console.log(`Seed set to: ${seed}`)
})

export { delay, times, quit, hydrateForTest, rand, randNum, testEntries, redisIdRegex, drain }
export {
delay,
withHandledRejection,
setTimeoutAsync,
times,
quit,
hydrateForTest,
rand,
randNum,
testEntries,
redisIdRegex,
drain,
}
18 changes: 13 additions & 5 deletions src/xread.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import {
delay,
hydrateForTest,
quit,
setTimeoutAsync,
testEntries,
redisIdRegex,
randNum,
withHandledRejection,
} from './test.util.spec.js'
import { RedisStream } from './stream.js'
import { RedisClient } from './types.js'
Expand Down Expand Up @@ -53,6 +55,7 @@ describe('redis-x-stream xread', () => {

it('should block waiting for new entries', async () => {
let entries = 0
let laterHydrate: Promise<unknown> | undefined
const streamName = key('my-stream'),
block = 200,
iterable = new RedisStream({
Expand All @@ -64,9 +67,10 @@ describe('redis-x-stream xread', () => {
await hydrate()
for await (const _ of iterable) {
if (entries++ === testEntries.length - 1) {
delay(block - 20).then(hydrate)
laterHydrate = withHandledRejection(delay(block - 20).then(() => hydrate()))
}
}
await laterHydrate
expect(entries).toEqual(testEntries.length * 2)
})

Expand Down Expand Up @@ -101,25 +105,29 @@ describe('redis-x-stream xread', () => {
count: randNum(300, 400),
})
let i = 0
let addStreamLater: Promise<void> | undefined
let quitLater: Promise<void> | undefined
for await (const [streamName, _] of stream) {
i++
if (i === testEntries.length) {
expect(streamName).toEqual(myStream)
setTimeout(() => {
addStreamLater = setTimeoutAsync(() => {
expect(stream.reading).toBe(true)
stream.addStream(laterStream)
return stream.addStream(laterStream)
})
}
if (i > testEntries.length) {
expect(streamName).toEqual(laterStream)
}
if (i === testEntries.length * 2 - 1) {
setTimeout(() => {
quitLater = setTimeoutAsync(() => {
i++
stream.quit() //break;
return stream.quit() //break;
}, 100)
}
}
await addStreamLater
await quitLater
//stream will block indefinitely (i++ in the future to assert after loop)
expect(i).toEqual(testEntries.length * 2 + 1)
})
Expand Down
68 changes: 44 additions & 24 deletions src/xreadgroup.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import {
quit,
rand,
redisIdRegex,
setTimeoutAsync,
testEntries,
withHandledRejection,
} from './test.util.spec.js'
import redisStream, { RedisStream } from './stream.js'
import { RedisClient } from './types.js'
Expand Down Expand Up @@ -324,25 +326,29 @@ describe('redis-x-stream xreadgroup', () => {
ackOnIterate: true,
})
let i = 0
let addStreamLater: Promise<void> | undefined
let quitLater: Promise<void> | undefined
for await (const [streamName, _] of stream) {
i++
if (i === testEntries.length) {
expect(streamName).toEqual(myStream)
setTimeout(() => {
addStreamLater = setTimeoutAsync(() => {
expect(stream.reading).toBe(true)
stream.addStream(laterStream)
return stream.addStream(laterStream)
})
}
if (i > testEntries.length) {
expect(streamName).toEqual(laterStream)
}
if (i === testEntries.length * 2 - 1) {
setTimeout(() => {
quitLater = setTimeoutAsync(() => {
i++
stream.quit()
return stream.quit()
}, 100)
}
}
await addStreamLater
await quitLater
expect(i).toEqual(testEntries.length * 2 + 1)
})

Expand All @@ -361,27 +367,31 @@ describe('redis-x-stream xreadgroup', () => {
})
let i = 0
let laterWrites: Promise<void> | undefined
let quitLater: Promise<void> | undefined
for await (const [streamName, _] of stream) {
i++
if (i === testEntries.length) {
expect(streamName).toEqual(myStream)
laterWrites = (async () => {
await waitFor(() => stream.reading)
await stream.addStream({ [laterStream]: '$' })
await hydrateForTest(writer, laterStream)
})()
laterWrites = withHandledRejection(
(async () => {
await waitFor(() => stream.reading)
await stream.addStream({ [laterStream]: '$' })
await hydrateForTest(writer, laterStream)
})(),
)
}
if (i > testEntries.length) {
expect(streamName).toEqual(laterStream)
}
if (i === testEntries.length * 2 - 1) {
setTimeout(() => {
quitLater = setTimeoutAsync(() => {
i++
stream.quit()
return stream.quit()
}, 100)
}
}
await laterWrites
await quitLater
// Got all entries from myStream + only newly-written entries from laterStream
expect(i).toEqual(testEntries.length * 2 + 1)
})
Expand All @@ -399,11 +409,13 @@ describe('redis-x-stream xreadgroup', () => {
ackOnIterate: true,
})

const iterator = (async () => {
for await (const _ of stream) {
void _
}
})()
const iterator = withHandledRejection(
(async () => {
for await (const _ of stream) {
void _
}
})(),
)

await waitFor(() => stream.reading)
await stream.addStream({ [laterStream]: '$' })
Expand Down Expand Up @@ -442,25 +454,29 @@ describe('redis-x-stream xreadgroup', () => {
ackOnIterate: true,
})
let i = 0
let addStreamLater: Promise<void> | undefined
let quitLater: Promise<void> | undefined
for await (const [streamName, _] of stream) {
i++
if (i === testEntries.length) {
expect(streamName).toEqual(myStream)
setTimeout(() => {
addStreamLater = setTimeoutAsync(() => {
expect(stream.reading).toBe(true)
stream.addStream(laterStream)
return stream.addStream(laterStream)
})
}
if (i > testEntries.length) {
expect(streamName).toEqual(laterStream)
}
if (i === testEntries.length * 2 - 1) {
setTimeout(() => {
quitLater = setTimeoutAsync(() => {
i++
stream.quit()
return stream.quit()
}, 100)
}
}
await addStreamLater
await quitLater
expect(i).toEqual(testEntries.length * 2 + 1)
})

Expand Down Expand Up @@ -491,25 +507,29 @@ describe('redis-x-stream xreadgroup', () => {
ackOnIterate: true,
})
let i = 0
let addStreamLater: Promise<void> | undefined
let quitLater: Promise<void> | undefined
for await (const [streamName, _] of stream) {
i++
if (i === testEntries.length) {
expect(streamName).toEqual(myStream)
setTimeout(() => {
addStreamLater = setTimeoutAsync(() => {
expect(stream.reading).toBe(true)
stream.addStream(laterStream)
return stream.addStream(laterStream)
})
}
if (i > testEntries.length) {
expect(streamName).toEqual(laterStream)
}
if (i === testEntries.length * 2 - 1) {
setTimeout(() => {
quitLater = setTimeoutAsync(() => {
i++
stream.quit()
return stream.quit()
}, 100)
}
}
await addStreamLater
await quitLater
// All entries from both streams, despite count=3 << PEL size
expect(i).toEqual(testEntries.length * 2 + 1)
})
Expand Down
Loading