diff --git a/README.md b/README.md index bfe9437..479f928 100644 --- a/README.md +++ b/README.md @@ -278,6 +278,111 @@ bootstrap() > **Note:** > Unless it is really desired, prefer [interrupt](#semaphore-interrupt) over `releaseAll`. +### Producer-Consumer + +The `ProducerConsumer` looks a lot like a [Semaphore](#semaphore), +but it returns values on _acquire_. + +By default, all readings use an array: + +```typescript +async function bootstrap() { + const producerConsumer = new ProducerConsumer([1]); + + const time1 = 100; + const time2 = 150; + + sleep(time1).then(() => semaphore.write(3, 4)); + sleep(time2).then(() => semaphore.write(2)); + + const maxTime = Math.max(time1, time2); + + const before = performance.now(); + const valuesRead = await semaphore.read(4); // waiting until all is read + const after = performance.now(); + + const elapsed = after - before; // ~150 + console.log("Done. took %dms with expected %dms", elapsed, maxTime); + console.log(valuesRead) // [1, 3, 4, 2] +} +bootstrap(); +``` + +#### Producer-Consumer tryRead + +It is possible to try to read some values in a given time limit. +The function will then throw an exception if it could not read in time: + +```typescript +async function bootstrap() { + const producerConsumer = new ProducerConsumer([1, 2, 3]); + + const success1 = await producerConsumer.tryRead(100, 2).then(() => true); + const success2 = await producerConsumer.tryRead(100, 2).catch((error: unknown) => { + if (error instanceof ConcurrencyExceedTimeoutException) { + return false; + } + + throw error; + }); + + console.log(success1); // true + console.log(success2); // false +} +bootstrap(); +``` + +#### Producer-Consumer readOne + +The `read` and `tryRead` have their "one"-method +that do the same thing but return only one value instead of an array: + +```typescript +async function bootstrap() { + const producerConsumer = new ProducerConsumer([1, 2]); + + // const [value1] = producerConsumer.read(1); + // can be written: + const value1 = producerConsumer.readOne(); + + // const [value2] = producerConsumer.tryRead(100, 1); + // can be written: + const value2 = producerConsumer.tryReadOne(100); + + console.log(value1, value2); // 1 2 +} +bootstrap(); +``` + +#### Producer-Consumer interrupt + +A `ProducerConsumer` can be interrupted at any time. +All awaiting _"threads"_ will then receive an exception: + +```typescript +async function bootstrap() { + const producerConsumer = new ProducerConsumer([1]); + + void sleep(100).then(() => producerConsumer.interrupt({ code: 502 }, [1, 2, 3])); + + const succeed = await Promise.all([ + producerConsumer.read(3), + producerConsumer.readOne(2), + producerConsumer.tryRead(200, 3), + producerConsumer.tryReadOne(200) + ]).catch((error: unknown) => { + if (error instanceof ConcurrencyInterruptedException) { + return false; + } + throw error; + }); + + console.log(succeed); // false + console.log(producerConsumer.permitsAvailable); // 3 +} +bootstrap(); +``` + ## When to use This package can be useful when writing test and wanting to synchronize events. @@ -289,19 +394,18 @@ if it comes from _regular_ observable or [subjects](https://rxjs.dev/guide/subje So this difference can be omitted with the following: - - ```typescript describe("My test", () => { it("should work", async () => { - const semaphore = new Semaphore(0); - const subscription = myObservable.subscribe(() => semaphore.release()); + const producerConsumer = new ProducerConsumer(); + const subscription = myObservable.subscribe(value => producerConsumer.write(value)); // something that updates the observable // Need to pass 2 times in the event - await semaphore.tryAcquire(500, 2); + const [r1, r2] = await producerConsumer.tryRead(500, 2); - expect(1).toBe(1); + expect(r1).toBe(1); + expect(r2).toBe(2); subscription.unsubscribe() }); }); diff --git a/package.json b/package.json index 36416cb..277e537 100644 --- a/package.json +++ b/package.json @@ -51,7 +51,10 @@ "sync", "synchronization", "promise", - "thread" + "thread", + "mutex", + "semaphore", + "producer-consumer" ], "license": "MIT", "main": "./dist/cjs/index.js", diff --git a/src/index.ts b/src/index.ts index c3fc809..e3f7fc2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ export * from "./exceptions"; export * from "./mutex"; +export * from "./producer-consumer"; export * from "./semaphore"; export * from "./synchronizer.interface"; diff --git a/src/producer-consumer/exceptions/index.ts b/src/producer-consumer/exceptions/index.ts new file mode 100644 index 0000000..472c56b --- /dev/null +++ b/src/producer-consumer/exceptions/index.ts @@ -0,0 +1,2 @@ +export * from "./producer-consumer.exception"; +export * from "./producer-consumer.invalid-read-parameter.exception"; diff --git a/src/producer-consumer/exceptions/producer-consumer.exception.ts b/src/producer-consumer/exceptions/producer-consumer.exception.ts new file mode 100644 index 0000000..ef47505 --- /dev/null +++ b/src/producer-consumer/exceptions/producer-consumer.exception.ts @@ -0,0 +1,6 @@ +import { ConcurrencyException } from "../../exceptions"; + +/** + * Any exception related only to a [ProducerConsumer]{@link ProducerConsumer}. + */ +export abstract class ProducerConsumerException extends ConcurrencyException {} diff --git a/src/producer-consumer/exceptions/producer-consumer.invalid-read-parameter.exception.ts b/src/producer-consumer/exceptions/producer-consumer.invalid-read-parameter.exception.ts new file mode 100644 index 0000000..4c5cc4a --- /dev/null +++ b/src/producer-consumer/exceptions/producer-consumer.invalid-read-parameter.exception.ts @@ -0,0 +1,6 @@ +import { ProducerConsumerException } from "./producer-consumer.exception"; + +/** + * Exception when a parameter is invalid for a read function + */ +export class ProducerConsumerInvalidReadParameterException extends ProducerConsumerException {} diff --git a/src/producer-consumer/index.ts b/src/producer-consumer/index.ts new file mode 100644 index 0000000..1759140 --- /dev/null +++ b/src/producer-consumer/index.ts @@ -0,0 +1,2 @@ +export * from "./exceptions"; +export * from "./producer-consumer"; diff --git a/src/producer-consumer/producer-consumer.spec.ts b/src/producer-consumer/producer-consumer.spec.ts new file mode 100644 index 0000000..6d25618 --- /dev/null +++ b/src/producer-consumer/producer-consumer.spec.ts @@ -0,0 +1,375 @@ +import { ProducerConsumerInvalidReadParameterException } from "./exceptions"; +import { ProducerConsumer } from "./producer-consumer"; +import { sleep } from "../../support/sleep"; +import { timeFunction } from "../../support/time-function"; +import { + ConcurrencyExceedTimeoutException, + ConcurrencyInterruptedException, + ConcurrencyInvalidTimeoutException +} from "../exceptions"; + +describe("ProducerConsumer", () => { + const delay = 60; + const offsetLow = 5; + const offset = 15; + + describe("Input validation", () => { + const prodCons = new ProducerConsumer(); + + it("should throw a read parameter exception when reading a negative number of items", async () => { + for (const nItem of [-1, -10, -100]) { + await expect(() => prodCons.read(nItem)).rejects.toThrow( + ProducerConsumerInvalidReadParameterException + ); + } + }); + + it("should throw a timeout exception when try-reading with negative timeout", async () => { + for (const timeout of [-1, -10, -100]) { + await expect(() => prodCons.tryRead(timeout, 1)).rejects.toThrow( + ConcurrencyInvalidTimeoutException + ); + } + }); + + it("should throw a read parameter exception when try-reading a negative number of items", async () => { + for (const nItem of [-1, -10, -100]) { + await expect(() => prodCons.tryRead(1000, nItem)).rejects.toThrow( + ProducerConsumerInvalidReadParameterException + ); + } + }); + + it("should throw a timeout exception when try-reading-one with negative timeout", async () => { + for (const timeout of [-1, -10, -100]) { + await expect(() => prodCons.tryReadOne(timeout)).rejects.toThrow( + ConcurrencyInvalidTimeoutException + ); + } + }); + }); + + describe("`read` usage", () => { + it("should read immediately with initial items", async () => { + const items = [1, "A string", { msg: "An object" }, ["An Array", 1]] as const; + + for (const [i, item] of items.entries()) { + const n = i + 1; + + const itemsInitial = Array(n).fill(item); + + const prodCons = new ProducerConsumer(itemsInitial); + expect(prodCons.itemsAvailable).toBe(n); + + const itemsRead = await prodCons.read(n); + expect(prodCons.itemsAvailable).toBe(0); + + expect(itemsRead).toHaveLength(n); + expect(itemsRead).toStrictEqual(itemsInitial); + } + }); + + it("should wait until items are available", async () => { + const prodCons = new ProducerConsumer(); + const valueWrite = [123, 456]; + + setTimeout(() => { + expect(prodCons.itemsRequired).toBe(2); + expect(prodCons.queueLength).toBe(1); + prodCons.write(...valueWrite); + }, delay); + + const [elapsed, valueRead] = await timeFunction(() => prodCons.read(2)); + + expect(prodCons.itemsRequired).toBe(0); + expect(prodCons.queueLength).toBe(0); + expect(valueRead).toStrictEqual(valueWrite); + + expect(elapsed).toBeGreaterThanOrEqual(delay - offsetLow); + expect(elapsed).toBeLessThanOrEqual(delay + offset); + }); + }); + + describe("`tryRead` usage", () => { + it("should work normally", async () => { + const prodCons = new ProducerConsumer(); + const [w1, w2] = [1, 2]; + + setTimeout(() => { + expect(prodCons.itemsRequired).toBe(2); + expect(prodCons.queueLength).toBe(1); + prodCons.write(w1); + + expect(prodCons.itemsRequired).toBe(1); + expect(prodCons.queueLength).toBe(1); + }, delay / 2); + + setTimeout(() => { + expect(prodCons.itemsRequired).toBe(1); + expect(prodCons.queueLength).toBe(1); + prodCons.write(w2); + + expect(prodCons.itemsRequired).toBe(0); + expect(prodCons.queueLength).toBe(0); + }, delay); + + const [r1, r2] = await prodCons.tryRead(delay * 2, 2); + + expect(r1).toBe(w1); + expect(r2).toBe(w2); + }); + + it("should immediately read when there is enough items", async () => { + const values = [1, 2, 3, 4, 5]; + const [w1, w2, w3, w4, w5] = values; + const prodCons = new ProducerConsumer(values); + + const [elapsed1, [r1, r2]] = await timeFunction(() => prodCons.tryRead(1, 2)); + const [elapsed2, [r3, r4, r5]] = await timeFunction(() => prodCons.tryRead(1, 3)); + + expect(r1).toBe(w1); + expect(r2).toBe(w2); + expect(r3).toBe(w3); + expect(r4).toBe(w4); + expect(r5).toBe(w5); + + expect(elapsed1).toBeLessThanOrEqual(offset); + expect(elapsed2).toBeLessThanOrEqual(offset); + }); + + it("should throw an error when the time exceeds", async () => { + const prodCons = new ProducerConsumer(["a"]); + + setTimeout(() => expect(prodCons.queueLength).toBe(1), delay / 2); + await expect(() => prodCons.tryRead(delay, 2)).rejects.toThrow( + ConcurrencyExceedTimeoutException + ); + + // The queue is reset, since the `tryRead` failed + expect(prodCons.queueLength).toBe(0); + expect(prodCons.itemsAvailable).toBe(1); + }); + + it('should not "reset" the state after a successful `tryRead`', async () => { + const prodCons = new ProducerConsumer([1]); + + // "Regular" use + setTimeout(() => prodCons.write(1), delay / 2); + await prodCons.tryRead(delay, 2); + + // Ok + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.queueLength).toBe(0); + + // After the timeout + await sleep(delay); + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.queueLength).toBe(0); + }); + + it("should write to other `read` or `tryRead` a when a `tryRead` fails", async () => { + const prodCons = new ProducerConsumer([1]); + + setTimeout(() => { + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.itemsRequired).toBe(5); + expect(prodCons.queueLength).toBe(2); + + prodCons.write(2); + expect(prodCons.itemsRequired).toBe(4); + expect(prodCons.queueLength).toBe(2); + }, delay); + + const [[v1, v2, v3]] = await Promise.all([ + // `read` after the `tryRead` + sleep(delay / 2).then(() => prodCons.read(3)), + + prodCons + .tryRead(delay * 2, 3) + .catch((error: unknown) => { + if (error instanceof ConcurrencyExceedTimeoutException) { + return []; + } + + throw error; + }) + .finally(() => { + // The `write` in the `setTimeout` reduced the required items + // of the second read to 1, so still 0 available + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.itemsRequired).toBe(1); + expect(prodCons.queueLength).toBe(1); + + setTimeout(() => prodCons.write(3, 4), delay); + }) + ]); + + // The state is reset: 3 items written for a single successful read (+ the initial item) + expect(prodCons.itemsAvailable).toBe(1); + expect(prodCons.itemsRequired).toBe(0); + expect(prodCons.queueLength).toBe(0); + + expect(v1).toBe(1); + expect(v2).toBe(2); + expect(v3).toBe(3); + + expect(await prodCons.readOne()).toBe(4); + }); + }); + + describe("`readOne` usage", () => { + it("should read immediately with initial items", async () => { + const items = [1, "A string", { msg: "An object" }, ["An Array", 1]] as const; + + for (const item of items) { + const prodCons = new ProducerConsumer([item]); + expect(prodCons.itemsAvailable).toBe(1); + + const itemRead = await prodCons.readOne(); + expect(prodCons.itemsAvailable).toBe(0); + + expect(itemRead).toStrictEqual(item); + } + }); + + it("should wait until an item is available", async () => { + const prodCons = new ProducerConsumer(); + const valueWrite = 123; + + setTimeout(() => { + expect(prodCons.itemsRequired).toBe(1); + expect(prodCons.queueLength).toBe(1); + prodCons.write(valueWrite); + }, delay); + + const [elapsed, valueRead] = await timeFunction(() => prodCons.readOne()); + + expect(prodCons.itemsRequired).toBe(0); + expect(prodCons.queueLength).toBe(0); + expect(valueRead).toStrictEqual(valueWrite); + + expect(elapsed).toBeGreaterThanOrEqual(delay - offsetLow); + expect(elapsed).toBeLessThanOrEqual(delay + offset); + }); + }); + + describe("`tryReadOne` usage", () => { + it("should work normally", async () => { + const prodCons = new ProducerConsumer(); + const [w1, w2] = [1, 2]; + + setTimeout(() => { + expect(prodCons.itemsRequired).toBe(2); + expect(prodCons.queueLength).toBe(2); + prodCons.write(w1); + + expect(prodCons.itemsRequired).toBe(1); + expect(prodCons.queueLength).toBe(1); + }, delay / 2); + + setTimeout(() => { + expect(prodCons.itemsRequired).toBe(1); + expect(prodCons.queueLength).toBe(1); + prodCons.write(w2); + + expect(prodCons.itemsRequired).toBe(0); + expect(prodCons.queueLength).toBe(0); + }, delay); + + const [r1, r2] = await Promise.all([ + prodCons.tryReadOne(delay * 2), + prodCons.tryReadOne(delay * 2) + ]); + + expect(r1).toBe(w1); + expect(r2).toBe(w2); + }); + + it("should immediately read when there is enough items", async () => { + const values = [1, 2]; + const [w1, w2] = values; + const prodCons = new ProducerConsumer(values); + + const [elapsed1, r1] = await timeFunction(() => prodCons.tryReadOne(1)); + const [elapsed2, r2] = await timeFunction(() => prodCons.tryReadOne(1)); + + expect(r1).toBe(w1); + expect(r2).toBe(w2); + + expect(elapsed1).toBeLessThanOrEqual(offset); + expect(elapsed2).toBeLessThanOrEqual(offset); + }); + + it("should throw an error when the time exceeds", async () => { + const prodCons = new ProducerConsumer(); + + setTimeout(() => { + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.itemsRequired).toBe(1); + expect(prodCons.queueLength).toBe(1); + }, delay / 2); + await expect(() => prodCons.tryReadOne(delay)).rejects.toThrow( + ConcurrencyExceedTimeoutException + ); + + // The queue is reset, since the `tryReadOne` failed + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.itemsRequired).toBe(0); + expect(prodCons.queueLength).toBe(0); + }); + + it('should not "reset" the state after a successful `tryRead`', async () => { + const prodCons = new ProducerConsumer(); + + // "Regular" use + setTimeout(() => prodCons.write(1), delay / 2); + await prodCons.tryReadOne(delay); + + // Ok + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.queueLength).toBe(0); + + // After the timeout + await sleep(delay); + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.queueLength).toBe(0); + }); + }); + + it("should interrupt all", async () => { + const delay = 80; + const prodCons = new ProducerConsumer(); + const reason = "test"; + + setTimeout(() => { + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.queueLength).toBe(4); + prodCons.interrupt(reason, [1, 2, 3]); + }, delay); + + const errors = await Promise.all([ + prodCons.readOne().catch((err: unknown) => err), + prodCons.read(2).catch((err: unknown) => err), + prodCons.tryReadOne(delay * 10).catch((err: unknown) => err), + prodCons.tryRead(delay * 10, 6).catch((err: unknown) => err) + ]); + + for (const error of errors) { + expect(error).toBeInstanceOf(ConcurrencyInterruptedException); + expect((error as ConcurrencyInterruptedException).getReason()).toBe(reason); + } + + expect(prodCons.itemsAvailable).toBe(3); + expect(prodCons.queueLength).toBe(0); + + // Same but without items + setTimeout(() => { + prodCons.interrupt(reason); + }, delay); + + await Promise.all([prodCons.read(5), prodCons.read(5)]).catch(() => void 0); + + expect(prodCons.itemsAvailable).toBe(0); + expect(prodCons.queueLength).toBe(0); + }); +}); diff --git a/src/producer-consumer/producer-consumer.ts b/src/producer-consumer/producer-consumer.ts new file mode 100644 index 0000000..fe9a8e5 --- /dev/null +++ b/src/producer-consumer/producer-consumer.ts @@ -0,0 +1,148 @@ +import { ProducerConsumerInvalidReadParameterException } from "./exceptions"; +import { Semaphore } from "../semaphore"; +import { Synchronizer } from "../synchronizer.interface"; + +/** + * A ProducerConsumer class to read and write data with synchronization. + * + * The queue is a simple FIFO (First In, First Out). + */ +export class ProducerConsumer implements Synchronizer { + /** + * The that will be read and write + */ + private readonly items: T[]; + /** + * The internal semaphore for synchronization + */ + private readonly semaphore: Semaphore; + + // With a mutex around `items`, it'll look like a Monitor pattern + + /** + * @returns the current number of available items + */ + public get itemsAvailable() { + return this.semaphore.permitsAvailable; + } + + /** + * @returns the minimal number of items needed to release all "threads" + */ + public get itemsRequired(): number { + return this.semaphore.permitsRequired; + } + + /** + * @returns the number of waiting "threads" on any read + */ + public get queueLength(): number { + return this.semaphore.queueLength; + } + + /** + * Create a ProducerConsumer + * + * @param initialItems the initial available values + */ + public constructor(initialItems: readonly T[] = []) { + this.items = initialItems.slice(); + this.semaphore = new Semaphore(initialItems.length); + } + + /** + * Reads a given number of items: + * Wait until the given number of items are available. + * + * @param nItem the number of items to read + * @throws {ProducerConsumerInvalidReadParameterException} when the number of items to read is invalid + * @throws {ConcurrencyInterruptedException} when the producer-consumer is interrupted + * @returns a promise with the read results + */ + public async read(nItem: number): Promise { + if (nItem < 0) { + throw new ProducerConsumerInvalidReadParameterException( + "Can not read a negative number of items." + ); + } + + await this.semaphore.acquire(nItem); + return this.items.splice(0, nItem); + } + + /** + * Reads one item: + * Wait until the item is available. + * + * @throws {ConcurrencyInterruptedException} when the producer-consumer is interrupted + * @returns a promise with the read results + */ + public readOne(): Promise { + return this.read(1).then(([item]) => item); + } + + /** + * Reads a given number of items: + * Wait until the given number of items are available. + * + * Throws an error if the given time exceeds + * and re-establish the state as if the method was not called. + * + * @param timeout maximum time (in ms) to read the items + * @param nItem the number of items to read + * @throws {ProducerConsumerInvalidReadParameterException} when the number of items to read is invalid + * @throws {ConcurrencyInterruptedException} when the producer-consumer is interrupted + * @returns a promise with the read results + */ + public async tryRead(timeout: number, nItem: number): Promise { + if (nItem < 0) { + throw new ProducerConsumerInvalidReadParameterException( + "Can not read a negative number of items." + ); + } + + await this.semaphore.tryAcquire(timeout, nItem); + return this.items.splice(0, nItem); + } + + /** + * Reads one item: + * Wait until the item is available. + * + * Throws an error if the given time exceeds + * and re-establish the state as if the method was not called. + * + * @param timeout maximum time (in ms) to read the items + * @throws {ConcurrencyInterruptedException} when the producer-consumer is interrupted + * @returns a promise with the read result + */ + public tryReadOne(timeout: number): Promise { + return this.tryRead(timeout, 1).then(([item]) => item); + } + + /** + * Write some items to the producer-consumer buffer. + * It releases the await readings or store the data for future readings. + * + * @param items the items to write + */ + public write(...items: T[]) { + this.items.push(...items); + this.semaphore.release(items.length); + } + + /** + * Interrupts all awaiting "threads" with an [exception]{@link ConcurrencyInterruptedException}. + * + * @param reason The reason why this producer-consumer is being interrupted + * @param items the items to set once everything has been interrupted + */ + public interrupt(reason: unknown, items: readonly T[] = []) { + // Remove all content + this.items.splice(0, this.items.length); + // Set the items + this.items.push(...items); + + this.semaphore.interrupt(reason, items.length); + } +} diff --git a/src/semaphore/semaphore.spec.ts b/src/semaphore/semaphore.spec.ts index 8f195c6..edbb4ac 100644 --- a/src/semaphore/semaphore.spec.ts +++ b/src/semaphore/semaphore.spec.ts @@ -1,5 +1,6 @@ import { SemaphoreInvalidPermitsException } from "./exceptions"; import { Semaphore } from "./semaphore"; +import { sleep } from "../../support/sleep"; import { timeFunction } from "../../support/time-function"; import { ConcurrencyInterruptedException } from "../exceptions"; import { ConcurrencyExceedTimeoutException } from "../exceptions/concurrency.exceed-timeout.exception"; @@ -207,11 +208,11 @@ describe("Semaphore", () => { semaphore.tryAcquire(1, semaphore.permitsAvailable) ); - expect(elapsed1).toBeLessThanOrEqual(2 * offset); - expect(elapsed2).toBeLessThanOrEqual(2 * offset); + expect(elapsed1).toBeLessThanOrEqual(offset); + expect(elapsed2).toBeLessThanOrEqual(offset); }); - it("should thrown an error when the time exceeds", async () => { + it("should throw an error when the time exceeds", async () => { const semaphore = new Semaphore(0); // The queue has 1 element @@ -226,7 +227,7 @@ describe("Semaphore", () => { expect(semaphore.permitsAvailable).toBe(0); }); - it("should thrown an error and reset state when the time exceeds (many releases)", async () => { + it("should throw an error and reset state when the time exceeds (many releases)", async () => { const semaphore = new Semaphore(2); setTimeout(() => { @@ -249,7 +250,7 @@ describe("Semaphore", () => { expect(semaphore.permitsAvailable).toBe(3); // the release in the timeout }); - it('should not "reset" the state after a `tryAcquire`', async () => { + it('should not "reset" the state after a successful `tryAcquire`', async () => { const semaphore = new Semaphore(1); // "Regular" use @@ -261,7 +262,7 @@ describe("Semaphore", () => { expect(semaphore.permitsRequired).toBe(0); // After the try timeout - await new Promise(resolve => setTimeout(resolve, delay)); + await sleep(delay); expect(semaphore.permitsAvailable).toBe(0); expect(semaphore.permitsRequired).toBe(0); }); @@ -273,6 +274,7 @@ describe("Semaphore", () => { expect(semaphore.permitsAvailable).toBe(0); expect(semaphore.permitsRequired).toBe(5); expect(semaphore.queueLength).toBe(2); + semaphore.release(); expect(semaphore.permitsRequired).toBe(4); expect(semaphore.queueLength).toBe(2); @@ -280,9 +282,7 @@ describe("Semaphore", () => { await Promise.all([ // `acquire` after the `tryAcquire` - new Promise(resolve => setTimeout(resolve, delay / 2)).then(() => - semaphore.acquire(3) - ), + sleep(delay / 2).then(() => semaphore.acquire(3)), semaphore .tryAcquire(delay * 2, 3) diff --git a/src/semaphore/semaphore.ts b/src/semaphore/semaphore.ts index 6278b0c..18b8d7e 100644 --- a/src/semaphore/semaphore.ts +++ b/src/semaphore/semaphore.ts @@ -226,7 +226,7 @@ export class Semaphore implements Synchronizer { } /** - * Interrupts all awaiting "Threads" with an [exception]{@link ConcurrencyInterruptedException}. + * Interrupts all awaiting "threads" with an [exception]{@link ConcurrencyInterruptedException}. * * @param reason The reason why this semaphore is being interrupted * @param permits The permits to set to this semaphore once everything has been interrupted diff --git a/tsconfig.spec.json b/tsconfig.spec.json index 99d5556..cdc6747 100644 --- a/tsconfig.spec.json +++ b/tsconfig.spec.json @@ -1,5 +1,6 @@ { "compilerOptions": { + "downlevelIteration": true, "types": ["jest", "jest-extended", "node"] }, "extends": "./tsconfig.json",