Skip to content

24-use-non-block-asnyc-to-default #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ console.log(count); // 1

### Mixed async/sync handlers

Non-blocking in default.

```typescript
const emitter = new Xevt();
const result: number[] = [];
Expand All @@ -128,27 +130,39 @@ for (let i = 0; i < 5; i++) {
emitter.emit("event", i);
}

// [0, 0, 1, 1, 2, 2, 3, 3, 4, 4]
// [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
```

Blocking mode.

```typescript
const emitter = new Xevt();
const result: number[] = [];
emitter.conjoin(["event1", "event2"], async () => {
result.push(1);
});
emitter.conjoin(["event1", "event2"], async () => {
result.push(2);
emitter.on("event", (data) => {
result.push(data);
});
emitter.on(
"event",
// deno-lint-ignore require-await
async (data) =>
new Promise((res) => {
setTimeout(() => {
result.push(data);
res(true);
}, 1);
}),
{ async: true },
);

for (let i = 0; i < 5; i++) {
emitter.emit("event1");
emitter.emit("event2");
emitter.emit("event", i);
}
await delay(15);

// [1, 2, 1, 2, 1, 2, 1, 2, 1, 2]
// [0, 0, 1, 1, 2, 2, 3, 3, 4, 4]
```


## Return unscriber after registered an event

```typescript
Expand Down
7 changes: 3 additions & 4 deletions modules/conjoin_emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { CoreEmitter } from "modules/core_emitter.ts";
import { Emitter } from "modules/emitter.ts";
import { SeriesRunner } from "modules/runners/series.ts";
import { ConjoinQueue } from "modules/conjoin_queue.ts";
import { RelayRunner } from "modules/runners/relay.ts";
import * as helpers from "modules/helpers.ts";

export class ConjoinEmitter extends CoreEmitter<ConjoinEvents>
Expand Down Expand Up @@ -76,6 +77,7 @@ export class ConjoinEmitter extends CoreEmitter<ConjoinEvents>
handler,
options: {
once: options?.once,
async: options?.async,
dual: helpers.isDualHandler(handler),
},
} as DualEventHandlerSignature<any> | GeneralEventHandlerSignature<any>;
Expand Down Expand Up @@ -136,10 +138,7 @@ export class ConjoinEmitter extends CoreEmitter<ConjoinEvents>
return this.prevEvents;
};

if (this.prevEvents instanceof Promise) {
return Promise.resolve(this.prevEvents).then(next);
}
return next();
return new RelayRunner().exec(this.prevEvents, next, { async: true });
}

off(event: ConjoinEvents, handler?: EventHandler): void {
Expand Down
14 changes: 7 additions & 7 deletions modules/core_emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
} from "modules/types.ts";

import { Logger } from "modules/logger.ts";
import * as helpers from "modules/helpers.ts";

export abstract class CoreEmitter<T> implements XCoreEmitter<T> {
protected handlers: RegisteredHandlers;
Expand All @@ -35,15 +36,14 @@ export abstract class CoreEmitter<T> implements XCoreEmitter<T> {
name: EventName,
signature: EventHandlerSignature<any>,
): EventUnscriber {
// @ts-ignore TS7053
const async = signature.handler[Symbol.toStringTag] === "AsyncFunction" ||
("then" in signature.handler);

signature.options ??= {};
signature.options.async = async;

if (this.debug) this.logger.debug("on", name, signature);

if (
signature.options?.async && !helpers.isAsyncFunction(signature.handler)
) {
delete signature.options.async;
}

const handlers = this.handlers.get(name);
if (handlers) {
handlers.push(signature);
Expand Down
1 change: 1 addition & 0 deletions modules/emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class Emitter extends CoreEmitter<EventName> implements XevtEmitter {
handler,
options: {
once: options?.once || event === EmitDone,
async: !!options?.async,
dual: helpers.isDualHandler(handler),
},
} as DualEventHandlerSignature<any> | GeneralEventHandlerSignature<any>;
Expand Down
11 changes: 11 additions & 0 deletions modules/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,14 @@ export function isDualHandler(
"true" in handler || "false" in handler
);
}

/**
* Check if a handler is an async function.
* @param handler The handler to check.
*/
export function isAsyncFunction(handler: any) {
// @ts-ignore TS7053
return (typeof handler === "function" &&
handler[Symbol.toStringTag] === "AsyncFunction") ||
("then" in handler);
}
6 changes: 2 additions & 4 deletions modules/runners/dual.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
} from "modules/types.ts";

import { SequenceRunner } from "modules/runners/sequence.ts";
import { RelayRunner } from "modules/runners/relay.ts";

/**
* Run a dual event handler.
Expand Down Expand Up @@ -53,9 +54,6 @@ export class DualRunner<N = any> {
* @param args The arguments to pass to the dual handler.
*/
exec(result: any) {
if (result instanceof Promise) {
return Promise.resolve(result).then((res) => this.dualExec(res));
}
return this.dualExec(result);
return new RelayRunner().exec(result, (p) => this.dualExec(p));
}
}
30 changes: 30 additions & 0 deletions modules/runners/relay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import type { EventHandler } from "modules/types.ts";

/** Options to RelayRunner */
export type RelayRunnerOptions = {
/**
* If the prev handler is a promise, it will await for it before executing the next handler.
* @default false
*/
async: boolean;
};

/** Handle how to run before and after handler. */
export class RelayRunner {
/**
* Execute the handler.
* @param prev The result of the previous handler.
* @param next The next handler.
* @param options The options to run the next handler.
*/
exec<T extends EventHandler>(
prev: any,
next: T,
options?: Partial<RelayRunnerOptions>,
): Promise<ReturnType<T>> | ReturnType<T> {
if (prev instanceof Promise && options?.async) {
return Promise.resolve(prev).then((res) => next(res));
}
return next(prev);
}
}
16 changes: 7 additions & 9 deletions modules/runners/sequence.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { GeneralEventHandlerSignature } from "modules/types.ts";

import { SingleRunner } from "modules/runners/single.ts";
import { RelayRunner } from "modules/runners/relay.ts";

/**
* Run handlers in sequence.
Expand All @@ -26,18 +27,15 @@ export class SequenceRunner<
exec(
args: Parameters<N["handler"]>,
index: number = 0,
): void | Promise<void> {
): ReturnType<N["handler"]> | Promise<ReturnType<N["handler"]>> | void {
const profile = this.handlers[index];
if (!profile) return;

const result = new SingleRunner<N>(profile).exec(args) as any;

/**
* Wait for the handler to finish before moving to the next handler.
*/
if (profile.options?.async || result instanceof Promise) {
return Promise.resolve(result).then(() => this.exec(args, index + 1));
}
return this.exec(args, index + 1);
return new RelayRunner().exec(
result,
() => this.exec(args, index + 1),
profile.options,
) as any;
}
}
9 changes: 5 additions & 4 deletions modules/runners/series.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { EventName, RegisteredHandlers } from "modules/types.ts";

import { StepRunner } from "modules/runners/step.ts";
import { RelayRunner } from "modules/runners/relay.ts";

/**
* Run handlers each in series.
Expand All @@ -24,9 +25,9 @@ export class SeriesRunner {
if (!key) return;

const step = new StepRunner(this.handlers).exec(key);
if (step instanceof Promise) {
return Promise.resolve(step).then(() => this.exec(series, idx + 1));
}
return this.exec(series, idx + 1);
return new RelayRunner().exec(
step,
() => this.exec(series, idx + 1),
) as any;
}
}
15 changes: 6 additions & 9 deletions modules/runners/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
import { DualRunner } from "modules/runners/dual.ts";
import { SingleRunner } from "modules/runners/single.ts";
import { SequenceRunner } from "modules/runners/sequence.ts";
import { RelayRunner } from "modules/runners/relay.ts";
import * as helpers from "modules/helpers.ts";

/**
Expand Down Expand Up @@ -51,17 +52,13 @@ export class StepRunner {

const next = (result: any) => {
const dualResult = new DualRunner(duals).exec(!!result);
if (dualResult instanceof Promise) {
return dualResult.then(() =>
this.execByIndex(handlers, duals, args, idx + 1)
);
}
return new RelayRunner().exec(
dualResult,
() => this.execByIndex(handlers, duals, args, idx + 1),
);
};

if (handler.options?.async) {
return Promise.resolve(result).then(() => next(result));
}
return next(result);
return new RelayRunner().exec(result, next, handler.options) as any;
}

/**
Expand Down
7 changes: 2 additions & 5 deletions modules/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export type ErrorHandler = (error: unknown) => void;

export type EventOptions = {
once: boolean;
async: boolean;
};

export type DualEventHandler =
Expand All @@ -19,11 +20,7 @@ export type DualEventHandlerSignature<T> = {
name: T;
handler: DualEventHandler;
options:
& Partial<
EventOptions & {
async: boolean;
}
>
& Partial<EventOptions>
& { dual: true };
};

Expand Down
6 changes: 3 additions & 3 deletions tests/deno/multiple_events_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ describe("Xevt - multiple events", () => {
setTimeout(() => {
result++;
resolve(true);
}, 10)
}, 1)
);
});
emitter.emit("event1");
Expand All @@ -150,7 +150,7 @@ describe("Xevt - multiple events", () => {
emitter.emit("event2");
emitter.emit("event1");
emitter.emit("event2");
await delay(100);
await delay(15);
assertEquals(result, 4);
});

Expand All @@ -169,7 +169,7 @@ describe("Xevt - multiple events", () => {
emitter.emit("event1");
emitter.emit("event2");
}
await delay(100);
await delay(10);
assertEquals(result, [1, 2, 1, 2, 1, 2, 1, 2, 1, 2]);
});
});
Expand Down
33 changes: 30 additions & 3 deletions tests/deno/single_event_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describe("Xevt - single event", () => {
assertEquals(result, [1, 2, 3, 4, 5, 6, 7, 8]);
});

it("mix handlers", async () => {
it("mix handlers - blocking", async () => {
const emitter = new Xevt();
const result: number[] = [];
emitter.on("event", (data) => {
Expand All @@ -130,16 +130,43 @@ describe("Xevt - single event", () => {
setTimeout(() => {
result.push(data);
res(true);
}, 10);
}, 1);
}),
{ async: true },
);

for (let i = 0; i < 5; i++) {
emitter.emit("event", i);
}
await delay(100);
await delay(15);
assertEquals(result, [0, 0, 1, 1, 2, 2, 3, 3, 4, 4]);
});

it("mix handlers - non-blocking", async () => {
const emitter = new Xevt();
const result: number[] = [];
emitter.on("event", (data) => {
result.push(data);
});
emitter.on(
"event",
// deno-lint-ignore require-await
async (data) =>
new Promise((res) => {
setTimeout(() => {
result.push(data);
res(true);
}, 1);
}),
{ async: false },
);

for (let i = 0; i < 5; i++) {
emitter.emit("event", i);
}
await delay(15);
assertEquals(result, [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]);
});
});

describe("Xevt - unscriber", () => {
Expand Down