Skip to content

Commit eb4fe52

Browse files
committed
feat(mutex): set the mutex definition and tests
1 parent 9b49643 commit eb4fe52

File tree

7 files changed

+241
-4
lines changed

7 files changed

+241
-4
lines changed

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
export * from "./exceptions";
2+
export * from "./mutex";
23
export * from "./semaphore";
4+
export * from "./synchronizer.interface";

src/mutex/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "./mutex";

src/mutex/mutex.spec.ts

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import { Mutex } from "./mutex";
2+
import { timeFunction } from "../../support/time-function";
3+
import {
4+
ConcurrencyExceedTimeoutException,
5+
ConcurrencyInterruptedException,
6+
ConcurrencyInvalidTimeoutException
7+
} from "../exceptions";
8+
9+
describe("Mutex", () => {
10+
describe("Input validation", () => {
11+
const mutex = new Mutex();
12+
13+
it("should throw a timeout exception when try-locking negative timeout", async () => {
14+
for (const timeout of [-1, -10, -100]) {
15+
await expect(() => mutex.tryLock(timeout)).rejects.toThrow(
16+
ConcurrencyInvalidTimeoutException
17+
);
18+
}
19+
});
20+
});
21+
22+
describe("`lock` usage", () => {
23+
const delay = 50;
24+
const offset = 3;
25+
26+
it("should work with a basic usage", async () => {
27+
// semaphore as a mutex
28+
const mutex = new Mutex();
29+
30+
expect(mutex.isLocked).toBeFalse();
31+
expect(mutex.queueLength).toBe(0);
32+
33+
const [elapsed] = await timeFunction(async () => {
34+
setTimeout(() => {
35+
expect(mutex.isLocked).toBeTrue();
36+
expect(mutex.queueLength).toBe(1);
37+
mutex.unlock();
38+
expect(mutex.isLocked).toBeFalse();
39+
expect(mutex.queueLength).toBe(0);
40+
}, delay);
41+
42+
// Will wait until the end of the `setTimeout`
43+
await mutex.lock();
44+
});
45+
46+
expect(elapsed).toBeGreaterThanOrEqual(delay - offset);
47+
expect(elapsed).toBeLessThanOrEqual(delay + offset);
48+
});
49+
50+
it("should work with many lock", async () => {
51+
const mutex = new Mutex();
52+
53+
const min = delay / 2;
54+
const med = delay;
55+
const max = delay * 2;
56+
57+
setTimeout(() => {
58+
expect(mutex.isLocked).toBeTrue();
59+
expect(mutex.queueLength).toBe(3);
60+
mutex.unlock();
61+
expect(mutex.isLocked).toBeTrue();
62+
expect(mutex.queueLength).toBe(2);
63+
}, min);
64+
65+
setTimeout(() => {
66+
expect(mutex.isLocked).toBeTrue();
67+
expect(mutex.queueLength).toBe(2);
68+
mutex.unlock();
69+
expect(mutex.isLocked).toBeTrue();
70+
expect(mutex.queueLength).toBe(1);
71+
}, med);
72+
73+
setTimeout(() => {
74+
expect(mutex.isLocked).toBeTrue();
75+
expect(mutex.queueLength).toBe(1);
76+
mutex.unlock();
77+
expect(mutex.isLocked).toBeFalse();
78+
expect(mutex.queueLength).toBe(0);
79+
}, max);
80+
81+
const [elapsed, times] = await timeFunction(() =>
82+
Promise.all([
83+
timeFunction(() => mutex.lock()),
84+
timeFunction(() => mutex.lock()),
85+
timeFunction(() => mutex.lock())
86+
])
87+
);
88+
89+
const [elapsed1, elapsed2, elapsed3] = times
90+
.map(([elapsed]) => elapsed)
91+
.sort((a, b) => a - b);
92+
93+
expect(elapsed1).toBeGreaterThanOrEqual(min - offset);
94+
expect(elapsed1).toBeLessThanOrEqual(min + offset);
95+
expect(elapsed2).toBeGreaterThanOrEqual(med - offset);
96+
expect(elapsed2).toBeLessThanOrEqual(med + offset);
97+
expect(elapsed3).toBeGreaterThanOrEqual(max - offset);
98+
expect(elapsed3).toBeLessThanOrEqual(max + offset);
99+
100+
// The global elapsed time is very close to the slowest timeout
101+
expect(elapsed).toBeGreaterThanOrEqual(max - offset);
102+
expect(elapsed).toBeLessThanOrEqual(max + offset);
103+
104+
expect(mutex.isLocked).toBeFalse();
105+
expect(mutex.queueLength).toBe(0);
106+
});
107+
});
108+
109+
describe("`tryLock` usage", () => {
110+
const delay = 50;
111+
const offset = 3;
112+
113+
it("should work with tryLock/unlock", async () => {
114+
const mutex = new Mutex();
115+
116+
const [elapsed] = await timeFunction(async () => {
117+
setTimeout(() => mutex.unlock(), delay);
118+
await mutex.tryLock(delay * 5);
119+
});
120+
121+
expect(elapsed).toBeGreaterThanOrEqual(delay - offset);
122+
expect(elapsed).toBeLessThanOrEqual(delay + offset);
123+
});
124+
125+
it("should thrown an error when the time exceeds", async () => {
126+
const mutex = new Mutex();
127+
128+
setTimeout(() => {
129+
expect(mutex.queueLength).toBe(1);
130+
expect(mutex.isLocked).toBeTrue();
131+
}, delay / 2);
132+
133+
await expect(() => mutex.tryLock(delay)).rejects.toThrow(
134+
ConcurrencyExceedTimeoutException
135+
);
136+
137+
// The queue is reset, since the `tryAcquire` failed
138+
expect(mutex.queueLength).toBe(0);
139+
expect(mutex.isLocked).toBeFalse();
140+
});
141+
});
142+
143+
it("should interrupt all", async () => {
144+
const delay = 100;
145+
const mutex = new Mutex();
146+
const reason = "test";
147+
148+
setTimeout(() => {
149+
expect(mutex.isLocked).toBeTrue();
150+
expect(mutex.queueLength).toBe(3);
151+
mutex.interrupt(reason);
152+
}, delay);
153+
154+
const errors = await Promise.all([
155+
mutex.lock().catch((err: unknown) => err),
156+
mutex.tryLock(delay * 2).catch((err: unknown) => err),
157+
mutex.lock().catch((err: unknown) => err)
158+
]);
159+
160+
for (const error of errors) {
161+
expect(error).toBeInstanceOf(ConcurrencyInterruptedException);
162+
expect((error as ConcurrencyInterruptedException).getReason()).toBe(reason);
163+
}
164+
165+
expect(mutex.isLocked).toBeFalse();
166+
expect(mutex.queueLength).toBe(0);
167+
});
168+
});

src/mutex/mutex.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { Synchronizer } from "../synchronizer.interface";
2+
3+
export class Mutex implements Synchronizer {
4+
/**
5+
* @returns if the current mutex is currently locked
6+
*/
7+
public get isLocked(): boolean {
8+
throw new Error("Not implemented yet");
9+
}
10+
11+
/**
12+
* @inheritDoc
13+
*/
14+
public get queueLength(): number {
15+
throw new Error("Not implemented yet");
16+
}
17+
18+
/**
19+
* Locks this mutex
20+
*
21+
* @returns a promise when the lock has been set
22+
*/
23+
public lock(): Promise<void> {
24+
throw new Error("Not implemented yet");
25+
}
26+
27+
/**
28+
* Locks this mutex within a time limit.
29+
*
30+
* Throws an error if the given time exceeds.
31+
*
32+
* @param timeout maximum time (in ms) to lock
33+
* @returns a promise when the lock has been set
34+
*/
35+
public tryLock(timeout: number): Promise<void> {
36+
throw new Error("Not implemented yet");
37+
}
38+
39+
/**
40+
* Unlocks this mutex
41+
*/
42+
public unlock() {
43+
throw new Error("Not implemented yet");
44+
}
45+
46+
/**
47+
* @inheritDoc
48+
*/
49+
public interrupt(reason: unknown) {
50+
throw new Error("Not implemented yet");
51+
}
52+
}

src/semaphore/semaphore.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,9 @@ describe("Semaphore", () => {
150150
const med = delay;
151151
const max = delay * 2;
152152

153-
setTimeout(() => semaphore.release(), delay);
154-
setTimeout(() => semaphore.release(), delay / 2);
155-
setTimeout(() => semaphore.release(), delay * 2);
153+
setTimeout(() => semaphore.release(), med);
154+
setTimeout(() => semaphore.release(), min);
155+
setTimeout(() => semaphore.release(), max);
156156

157157
// global elapsed of the operation and the times of the individual acquires
158158
const [elapsed, times] = await timeFunction(() =>

src/semaphore/semaphore.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
ConcurrencyInvalidTimeoutException
55
} from "../exceptions";
66
import { ConcurrencyInterruptedException } from "../exceptions";
7+
import { Synchronizer } from "../synchronizer.interface";
78

89
type Resolver = () => void;
910

@@ -28,7 +29,7 @@ interface QueueItem {
2829
*
2930
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise#promise_concurrency
3031
*/
31-
export class Semaphore {
32+
export class Semaphore implements Synchronizer {
3233
/**
3334
* Queue of the "threads"
3435
*/

src/synchronizer.interface.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
export interface Synchronizer {
2+
/**
3+
* Interrupts all awaiting "Threads" with an [exception]{@link ConcurrencyInterruptedException}.
4+
*
5+
* @param reason The reason why this semaphore is being interrupted
6+
*/
7+
interrupt: (reason: unknown) => void;
8+
9+
/**
10+
* @returns the number of waiting "Threads" on a locking statement
11+
*/
12+
queueLength: number;
13+
}

0 commit comments

Comments
 (0)