Skip to content

Commit 49b60b9

Browse files
authored
feat: non-blocking async in default (#25)
* WIP * WIP * WIP * WIP * WIP * WIP * WIP * WIP * WIP * wip
1 parent e50ba3c commit 49b60b9

File tree

13 files changed

+130
-57
lines changed

13 files changed

+130
-57
lines changed

README.md

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ console.log(count); // 1
107107

108108
### Mixed async/sync handlers
109109

110+
Non-blocking in default.
111+
110112
```typescript
111113
const emitter = new Xevt();
112114
const result: number[] = [];
@@ -128,27 +130,39 @@ for (let i = 0; i < 5; i++) {
128130
emitter.emit("event", i);
129131
}
130132

131-
// [0, 0, 1, 1, 2, 2, 3, 3, 4, 4]
133+
// [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
132134
```
133135

136+
Blocking mode.
137+
134138
```typescript
135139
const emitter = new Xevt();
136140
const result: number[] = [];
137-
emitter.conjoin(["event1", "event2"], async () => {
138-
result.push(1);
139-
});
140-
emitter.conjoin(["event1", "event2"], async () => {
141-
result.push(2);
141+
emitter.on("event", (data) => {
142+
result.push(data);
142143
});
144+
emitter.on(
145+
"event",
146+
// deno-lint-ignore require-await
147+
async (data) =>
148+
new Promise((res) => {
149+
setTimeout(() => {
150+
result.push(data);
151+
res(true);
152+
}, 1);
153+
}),
154+
{ async: true },
155+
);
143156

144157
for (let i = 0; i < 5; i++) {
145-
emitter.emit("event1");
146-
emitter.emit("event2");
158+
emitter.emit("event", i);
147159
}
160+
await delay(15);
148161

149-
// [1, 2, 1, 2, 1, 2, 1, 2, 1, 2]
162+
// [0, 0, 1, 1, 2, 2, 3, 3, 4, 4]
150163
```
151164

165+
152166
## Return unscriber after registered an event
153167

154168
```typescript

modules/conjoin_emitter.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { CoreEmitter } from "modules/core_emitter.ts";
1616
import { Emitter } from "modules/emitter.ts";
1717
import { SeriesRunner } from "modules/runners/series.ts";
1818
import { ConjoinQueue } from "modules/conjoin_queue.ts";
19+
import { RelayRunner } from "modules/runners/relay.ts";
1920
import * as helpers from "modules/helpers.ts";
2021

2122
export class ConjoinEmitter extends CoreEmitter<ConjoinEvents>
@@ -76,6 +77,7 @@ export class ConjoinEmitter extends CoreEmitter<ConjoinEvents>
7677
handler,
7778
options: {
7879
once: options?.once,
80+
async: options?.async,
7981
dual: helpers.isDualHandler(handler),
8082
},
8183
} as DualEventHandlerSignature<any> | GeneralEventHandlerSignature<any>;
@@ -136,10 +138,7 @@ export class ConjoinEmitter extends CoreEmitter<ConjoinEvents>
136138
return this.prevEvents;
137139
};
138140

139-
if (this.prevEvents instanceof Promise) {
140-
return Promise.resolve(this.prevEvents).then(next);
141-
}
142-
return next();
141+
return new RelayRunner().exec(this.prevEvents, next, { async: true });
143142
}
144143

145144
off(event: ConjoinEvents, handler?: EventHandler): void {

modules/core_emitter.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import type {
1010
} from "modules/types.ts";
1111

1212
import { Logger } from "modules/logger.ts";
13+
import * as helpers from "modules/helpers.ts";
1314

1415
export abstract class CoreEmitter<T> implements XCoreEmitter<T> {
1516
protected handlers: RegisteredHandlers;
@@ -35,15 +36,14 @@ export abstract class CoreEmitter<T> implements XCoreEmitter<T> {
3536
name: EventName,
3637
signature: EventHandlerSignature<any>,
3738
): EventUnscriber {
38-
// @ts-ignore TS7053
39-
const async = signature.handler[Symbol.toStringTag] === "AsyncFunction" ||
40-
("then" in signature.handler);
41-
42-
signature.options ??= {};
43-
signature.options.async = async;
44-
4539
if (this.debug) this.logger.debug("on", name, signature);
4640

41+
if (
42+
signature.options?.async && !helpers.isAsyncFunction(signature.handler)
43+
) {
44+
delete signature.options.async;
45+
}
46+
4747
const handlers = this.handlers.get(name);
4848
if (handlers) {
4949
handlers.push(signature);

modules/emitter.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export class Emitter extends CoreEmitter<EventName> implements XevtEmitter {
2929
handler,
3030
options: {
3131
once: options?.once || event === EmitDone,
32+
async: !!options?.async,
3233
dual: helpers.isDualHandler(handler),
3334
},
3435
} as DualEventHandlerSignature<any> | GeneralEventHandlerSignature<any>;

modules/helpers.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,14 @@ export function isDualHandler(
2626
"true" in handler || "false" in handler
2727
);
2828
}
29+
30+
/**
31+
* Check if a handler is an async function.
32+
* @param handler The handler to check.
33+
*/
34+
export function isAsyncFunction(handler: any) {
35+
// @ts-ignore TS7053
36+
return (typeof handler === "function" &&
37+
handler[Symbol.toStringTag] === "AsyncFunction") ||
38+
("then" in handler);
39+
}

modules/runners/dual.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type {
44
} from "modules/types.ts";
55

66
import { SequenceRunner } from "modules/runners/sequence.ts";
7+
import { RelayRunner } from "modules/runners/relay.ts";
78

89
/**
910
* Run a dual event handler.
@@ -53,9 +54,6 @@ export class DualRunner<N = any> {
5354
* @param args The arguments to pass to the dual handler.
5455
*/
5556
exec(result: any) {
56-
if (result instanceof Promise) {
57-
return Promise.resolve(result).then((res) => this.dualExec(res));
58-
}
59-
return this.dualExec(result);
57+
return new RelayRunner().exec(result, (p) => this.dualExec(p));
6058
}
6159
}

modules/runners/relay.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import type { EventHandler } from "modules/types.ts";
2+
3+
/** Options to RelayRunner */
4+
export type RelayRunnerOptions = {
5+
/**
6+
* If the prev handler is a promise, it will await for it before executing the next handler.
7+
* @default false
8+
*/
9+
async: boolean;
10+
};
11+
12+
/** Handle how to run before and after handler. */
13+
export class RelayRunner {
14+
/**
15+
* Execute the handler.
16+
* @param prev The result of the previous handler.
17+
* @param next The next handler.
18+
* @param options The options to run the next handler.
19+
*/
20+
exec<T extends EventHandler>(
21+
prev: any,
22+
next: T,
23+
options?: Partial<RelayRunnerOptions>,
24+
): Promise<ReturnType<T>> | ReturnType<T> {
25+
if (prev instanceof Promise && options?.async) {
26+
return Promise.resolve(prev).then((res) => next(res));
27+
}
28+
return next(prev);
29+
}
30+
}

modules/runners/sequence.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { GeneralEventHandlerSignature } from "modules/types.ts";
22

33
import { SingleRunner } from "modules/runners/single.ts";
4+
import { RelayRunner } from "modules/runners/relay.ts";
45

56
/**
67
* Run handlers in sequence.
@@ -26,18 +27,15 @@ export class SequenceRunner<
2627
exec(
2728
args: Parameters<N["handler"]>,
2829
index: number = 0,
29-
): void | Promise<void> {
30+
): ReturnType<N["handler"]> | Promise<ReturnType<N["handler"]>> | void {
3031
const profile = this.handlers[index];
3132
if (!profile) return;
3233

3334
const result = new SingleRunner<N>(profile).exec(args) as any;
34-
35-
/**
36-
* Wait for the handler to finish before moving to the next handler.
37-
*/
38-
if (profile.options?.async || result instanceof Promise) {
39-
return Promise.resolve(result).then(() => this.exec(args, index + 1));
40-
}
41-
return this.exec(args, index + 1);
35+
return new RelayRunner().exec(
36+
result,
37+
() => this.exec(args, index + 1),
38+
profile.options,
39+
) as any;
4240
}
4341
}

modules/runners/series.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { EventName, RegisteredHandlers } from "modules/types.ts";
22

33
import { StepRunner } from "modules/runners/step.ts";
4+
import { RelayRunner } from "modules/runners/relay.ts";
45

56
/**
67
* Run handlers each in series.
@@ -24,9 +25,9 @@ export class SeriesRunner {
2425
if (!key) return;
2526

2627
const step = new StepRunner(this.handlers).exec(key);
27-
if (step instanceof Promise) {
28-
return Promise.resolve(step).then(() => this.exec(series, idx + 1));
29-
}
30-
return this.exec(series, idx + 1);
28+
return new RelayRunner().exec(
29+
step,
30+
() => this.exec(series, idx + 1),
31+
) as any;
3132
}
3233
}

modules/runners/step.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type {
99
import { DualRunner } from "modules/runners/dual.ts";
1010
import { SingleRunner } from "modules/runners/single.ts";
1111
import { SequenceRunner } from "modules/runners/sequence.ts";
12+
import { RelayRunner } from "modules/runners/relay.ts";
1213
import * as helpers from "modules/helpers.ts";
1314

1415
/**
@@ -51,17 +52,13 @@ export class StepRunner {
5152

5253
const next = (result: any) => {
5354
const dualResult = new DualRunner(duals).exec(!!result);
54-
if (dualResult instanceof Promise) {
55-
return dualResult.then(() =>
56-
this.execByIndex(handlers, duals, args, idx + 1)
57-
);
58-
}
55+
return new RelayRunner().exec(
56+
dualResult,
57+
() => this.execByIndex(handlers, duals, args, idx + 1),
58+
);
5959
};
6060

61-
if (handler.options?.async) {
62-
return Promise.resolve(result).then(() => next(result));
63-
}
64-
return next(result);
61+
return new RelayRunner().exec(result, next, handler.options) as any;
6562
}
6663

6764
/**

0 commit comments

Comments
 (0)