Skip to content

Commit d2983a0

Browse files
committed
feat(producer-consumer): implement ProducerConsumer
#3
1 parent 477a227 commit d2983a0

1 file changed

Lines changed: 47 additions & 13 deletions

File tree

src/producer-consumer/producer-consumer.ts

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,43 @@
1+
import { ProducerConsumerInvalidReadParameterException } from "./exceptions";
2+
import { Semaphore } from "../semaphore";
13
import { Synchronizer } from "../synchronizer.interface";
24

5+
/**
6+
* A ProducerConsumer class to read and write data with synchronization.
7+
*
8+
* The queue is a simple FIFO (First In, First Out).
9+
*/
310
export class ProducerConsumer<T> implements Synchronizer {
411
/**
5-
* The data available for instant reading
12+
* The that will be read and write
613
*/
714
private readonly items: T[];
15+
/**
16+
* The internal semaphore for synchronization
17+
*/
18+
private readonly semaphore: Semaphore;
19+
20+
// With a mutex around `items`, it'll look like a Monitor pattern
821

922
/**
1023
* @returns the current number of available items
1124
*/
1225
public get itemsAvailable() {
13-
return this.items.length;
26+
return this.semaphore.permitsAvailable;
1427
}
1528

1629
/**
1730
* @returns the minimal number of items needed to release all "threads"
1831
*/
1932
public get itemsRequired(): number {
20-
throw new Error("Not implemented yet");
33+
return this.semaphore.permitsRequired;
2134
}
2235

2336
/**
2437
* @returns the number of waiting "threads" on any read
2538
*/
2639
public get queueLength(): number {
27-
throw new Error("Not implemented yet");
40+
return this.semaphore.queueLength;
2841
}
2942

3043
/**
@@ -34,6 +47,7 @@ export class ProducerConsumer<T> implements Synchronizer {
3447
*/
3548
public constructor(initialItems: readonly T[] = []) {
3649
this.items = initialItems.slice();
50+
this.semaphore = new Semaphore(initialItems.length);
3751
}
3852

3953
/**
@@ -45,8 +59,15 @@ export class ProducerConsumer<T> implements Synchronizer {
4559
* @throws {ConcurrencyInterruptedException} when the producer-consumer is interrupted
4660
* @returns a promise with the read results
4761
*/
48-
public read(nItem: number): Promise<T[]> {
49-
throw new Error("Not implemented yet");
62+
public async read(nItem: number): Promise<T[]> {
63+
if (nItem < 0) {
64+
throw new ProducerConsumerInvalidReadParameterException(
65+
"Can not read a negative number of items."
66+
);
67+
}
68+
69+
await this.semaphore.acquire(nItem);
70+
return this.items.splice(0, nItem);
5071
}
5172

5273
/**
@@ -57,7 +78,7 @@ export class ProducerConsumer<T> implements Synchronizer {
5778
* @returns a promise with the read results
5879
*/
5980
public readOne(): Promise<T> {
60-
return this.read(1).then(([data]) => data);
81+
return this.read(1).then(([item]) => item);
6182
}
6283

6384
/**
@@ -73,8 +94,15 @@ export class ProducerConsumer<T> implements Synchronizer {
7394
* @throws {ConcurrencyInterruptedException} when the producer-consumer is interrupted
7495
* @returns a promise with the read results
7596
*/
76-
public tryRead(timeout: number, nItem: number): Promise<T[]> {
77-
throw new Error("Not implemented yet");
97+
public async tryRead(timeout: number, nItem: number): Promise<T[]> {
98+
if (nItem < 0) {
99+
throw new ProducerConsumerInvalidReadParameterException(
100+
"Can not read a negative number of items."
101+
);
102+
}
103+
104+
await this.semaphore.tryAcquire(timeout, nItem);
105+
return this.items.splice(0, nItem);
78106
}
79107

80108
/**
@@ -89,17 +117,18 @@ export class ProducerConsumer<T> implements Synchronizer {
89117
* @returns a promise with the read result
90118
*/
91119
public tryReadOne(timeout: number): Promise<T> {
92-
return this.tryRead(timeout, 1).then(([data]) => data);
120+
return this.tryRead(timeout, 1).then(([item]) => item);
93121
}
94122

95123
/**
96124
* Write some items to the producer-consumer buffer.
97-
* It releases the await readings or store the data.
125+
* It releases the await readings or store the data for future readings.
98126
*
99127
* @param items the items to write
100128
*/
101129
public write(...items: T[]) {
102-
throw new Error("Not implemented yet");
130+
this.items.push(...items);
131+
this.semaphore.release(items.length);
103132
}
104133

105134
/**
@@ -109,6 +138,11 @@ export class ProducerConsumer<T> implements Synchronizer {
109138
* @param items the items to set once everything has been interrupted
110139
*/
111140
public interrupt(reason: unknown, items: readonly T[] = []) {
112-
throw new Error("Not implemented yet");
141+
// Remove all content
142+
this.items.splice(0, this.items.length);
143+
// Set the items
144+
this.items.push(...items);
145+
146+
this.semaphore.interrupt(reason, items.length);
113147
}
114148
}

0 commit comments

Comments
 (0)