Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 110 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,111 @@ bootstrap()
> **Note:**
> Unless it is really desired, prefer [interrupt](#semaphore-interrupt) over `releaseAll`.

### Producer-Consumer

The `ProducerConsumer` looks a lot like a [Semaphore](#semaphore),
but it returns values on _acquire_.

By default, all readings use an array:

```typescript
async function bootstrap() {
const producerConsumer = new ProducerConsumer([1]);

const time1 = 100;
const time2 = 150;

sleep(time1).then(() => semaphore.write(3, 4));
sleep(time2).then(() => semaphore.write(2));

const maxTime = Math.max(time1, time2);

const before = performance.now();
const valuesRead = await semaphore.read(4); // waiting until all is read
const after = performance.now();

const elapsed = after - before; // ~150
console.log("Done. took %dms with expected %dms", elapsed, maxTime);
console.log(valuesRead) // [1, 3, 4, 2]
}
bootstrap();
```

#### Producer-Consumer tryRead

It is possible to try to read some values in a given time limit.
The function will then throw an exception if it could not read in time:

```typescript
async function bootstrap() {
const producerConsumer = new ProducerConsumer([1, 2, 3]);

const success1 = await producerConsumer.tryRead(100, 2).then(() => true);
const success2 = await producerConsumer.tryRead(100, 2).catch((error: unknown) => {
if (error instanceof ConcurrencyExceedTimeoutException) {
return false;
}

throw error;
});

console.log(success1); // true
console.log(success2); // false
}
bootstrap();
```

#### Producer-Consumer readOne

The `read` and `tryRead` have their "one"-method
that do the same thing but return only one value instead of an array:

```typescript
async function bootstrap() {
const producerConsumer = new ProducerConsumer([1, 2]);

// const [value1] = producerConsumer.read(1);
// can be written:
const value1 = producerConsumer.readOne();

// const [value2] = producerConsumer.tryRead(100, 1);
// can be written:
const value2 = producerConsumer.tryReadOne(100);

console.log(value1, value2); // 1 2
}
bootstrap();
```

#### Producer-Consumer interrupt

A `ProducerConsumer` can be interrupted at any time.
All awaiting _"threads"_ will then receive an exception:

```typescript
async function bootstrap() {
const producerConsumer = new ProducerConsumer([1]);

void sleep(100).then(() => producerConsumer.interrupt({ code: 502 }, [1, 2, 3]));

const succeed = await Promise.all([
producerConsumer.read(3),
producerConsumer.readOne(2),
producerConsumer.tryRead(200, 3),
producerConsumer.tryReadOne(200)
]).catch((error: unknown) => {
if (error instanceof ConcurrencyInterruptedException) {
return false;
}
throw error;
});

console.log(succeed); // false
console.log(producerConsumer.permitsAvailable); // 3
}
bootstrap();
```

## When to use

This package can be useful when writing test and wanting to synchronize events.
Expand All @@ -289,19 +394,18 @@ if it comes from _regular_ observable or [subjects](https://rxjs.dev/guide/subje

So this difference can be omitted with the following:

<!-- TODO: producer-consumer example -->

```typescript
describe("My test", () => {
it("should work", async () => {
const semaphore = new Semaphore(0);
const subscription = myObservable.subscribe(() => semaphore.release());
const producerConsumer = new ProducerConsumer();
const subscription = myObservable.subscribe(value => producerConsumer.write(value));
// something that updates the observable

// Need to pass 2 times in the event
await semaphore.tryAcquire(500, 2);
const [r1, r2] = await producerConsumer.tryRead(500, 2);

expect(1).toBe(1);
expect(r1).toBe(1);
expect(r2).toBe(2);
subscription.unsubscribe()
});
});
Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
"sync",
"synchronization",
"promise",
"thread"
"thread",
"mutex",
"semaphore",
"producer-consumer"
],
"license": "MIT",
"main": "./dist/cjs/index.js",
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from "./exceptions";
export * from "./mutex";
export * from "./producer-consumer";
export * from "./semaphore";
export * from "./synchronizer.interface";
2 changes: 2 additions & 0 deletions src/producer-consumer/exceptions/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./producer-consumer.exception";
export * from "./producer-consumer.invalid-read-parameter.exception";
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { ConcurrencyException } from "../../exceptions";

/**
* Any exception related only to a [ProducerConsumer]{@link ProducerConsumer}.
*/
export abstract class ProducerConsumerException extends ConcurrencyException {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { ProducerConsumerException } from "./producer-consumer.exception";

/**
* Exception when a parameter is invalid for a read function
*/
export class ProducerConsumerInvalidReadParameterException extends ProducerConsumerException {}
2 changes: 2 additions & 0 deletions src/producer-consumer/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./exceptions";
export * from "./producer-consumer";
Loading