Skip to content

feat: support Bun #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions benchmark/fixtures/wrap-add-bun.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import add from './add.mjs'

self.onmessage = (event) => {
postMessage(add(event.data))
}
126 changes: 105 additions & 21 deletions benchmark/isolate-benchmark.mjs
Original file line number Diff line number Diff line change
@@ -1,35 +1,60 @@
/*
* Benchmark for testing whether Tinypool's worker creation and teardown is expensive.
* Benchmark focusing on the performance `isolateWorkers` option
*
* Options:
* - `--rounds` (optional) - Specify how many iterations to run
* - `--threads` (optional) - Specify how many threads to use
*/
import { cpus } from 'node:os'
import { Worker } from 'node:worker_threads'

import * as os from 'node:os'
import * as WorkerThreads from 'node:worker_threads'

import Tinypool from '../dist/esm/index.js'

const THREADS = cpus().length - 1
const ROUNDS = 5_000
const IS_BUN = process.versions.bun !== undefined
const USE_ATOMICS = !IS_BUN
const THREADS = parseInt(getArgument('--threads') ?? getMaxThreads(), 10)
const ROUNDS = parseInt(getArgument('--rounds') ?? '5_000', 10)

console.log('Options:', { THREADS, ROUNDS, IS_BUN }, '\n')

await logTime(
"Tinypool { runtime: 'worker_threds' }",
runTinypool('worker_threds')
)
await logTime(
"Tinypool { runtime: 'child_process' }",
runTinypool('child_process')
)

await logTime('Tinypool', runTinypool)
await logTime('Worker threads', runWorkerThreads)
if (IS_BUN) {
await logTime('Native Bun workers', runBunWorkers())
}

await logTime('Native node:worker_threads', runNodeWorkerThreads())

async function runTinypool() {
function runTinypool(runtime) {
const pool = new Tinypool({
runtime,
filename: new URL('./fixtures/add.mjs', import.meta.url).href,
isolateWorkers: true,
minThreads: THREADS,
maxThreads: THREADS,
useAtomics: USE_ATOMICS,
})

await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
return async function run() {
await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
}
}

async function runWorkerThreads() {
function runNodeWorkerThreads() {
async function task() {
const worker = new Worker('./fixtures/wrap-add.mjs')
const worker = new WorkerThreads.Worker('./fixtures/wrap-add.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) =>
Expand All @@ -50,16 +75,75 @@ async function runWorkerThreads() {
}
}

await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function runBunWorkers() {
async function task() {
const worker = new Worker('./fixtures/wrap-add-bun.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) => {
worker.onmessage = (event) =>
event.data === 3 ? resolve() : reject('Not 3')
})

await worker.terminate()
}

const pool = Array(ROUNDS).fill(task)

async function execute() {
const task = pool.shift()

if (task) {
await task()
return execute()
}
}

return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function getArgument(flag) {
const index = process.argv.indexOf(flag)
if (index === -1) return

return process.argv[index + 1]
}

function getMaxThreads() {
return os.availableParallelism?.() || os.cpus().length - 1
}

async function logTime(label, method) {
console.log(`${label} | START`)

const start = process.hrtime.bigint()
await method()
const end = process.hrtime.bigint()
console.log(label, 'took', ((end - start) / 1_000_000n).toString(), 'ms')

console.log(`${label} | END ${((end - start) / 1_000_000n).toString()} ms`)

console.log('Cooling down for 2s')
const interval = setInterval(() => process.stdout.write('.'), 100)
await sleep(2_000)
clearInterval(interval)
console.log(' ✓\n')
}

async function sleep(ms) {
await new Promise((resolve) => setTimeout(resolve, ms))
}
6 changes: 5 additions & 1 deletion src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export interface TinypoolWorker {
resourceLimits?: any
workerData?: TinypoolData
trackUnmanagedFds?: boolean
}): void
}): Promise<void>
terminate(): Promise<any>
postMessage(message: any, transferListItem?: TransferListItem[]): void
setChannel?: (channel: TinypoolChannel) => void
Expand Down Expand Up @@ -57,6 +57,10 @@ export interface RequestMessage {
name: string
}

export interface SpawnMessage {
spawned: true
}

export interface ReadyMessage {
ready: true
}
Expand Down
7 changes: 7 additions & 0 deletions src/entry/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
ReadyMessage,
RequestMessage,
ResponseMessage,
SpawnMessage,
StartupMessage,
TinypoolWorkerMessage,
} from '../common'
Expand All @@ -23,6 +24,12 @@ process.__tinypool_state__ = {
workerId: process.pid,
}

let emittedReady = false
if (!emittedReady) {
process.send!(<SpawnMessage>{ spawned: true })
emittedReady = true
}

process.on('message', (message: IncomingMessage) => {
// Message was not for port or pool
// It's likely end-users own communication between main and worker
Expand Down
7 changes: 7 additions & 0 deletions src/entry/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ parentPort!.on('message', (message: StartupMessage) => {
useAtomics =
process.env.PISCINA_DISABLE_ATOMICS === '1' ? false : message.useAtomics

if (useAtomics && process.versions.bun) {
const error = 'useAtomics cannot be used with Bun at the moment.'
Copy link
Member Author

@AriPerkkio AriPerkkio Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workers are unable to terminate when using bun. I think the while loops below are causing it. oven-sh/bun#4134

import { isMainThread, Worker } from "node:worker_threads";
import { fileURLToPath } from "node:url";

if (isMainThread) {
  console.log("[main] running");
  const worker = new Worker(fileURLToPath(import.meta.url));

  await new Promise((r) => setTimeout(r, 1_000));
  const timeout = setTimeout(() => console.error("Timeout"), 2_000);

  console.log("[main] terminating");
  await worker.terminate();
  clearTimeout(timeout);
  console.log("[main] terminated");
} else {
  console.log("[worker] running");

  while (true) {}
}
ari ~/repro  $ node worker-threads.mjs 
[main] running
[worker] running
[main] terminating
[main] terminated

ari ~/repro  $ bun worker-threads.mjs 
[main] running
[worker] running
[main] terminating
Timeout
<stuck here>

console.error(error)
throw new Error(error)
}

const { port, sharedBuffer, filename, name } = message

;(async function () {
Expand All @@ -48,6 +54,7 @@ parentPort!.on('message', (message: StartupMessage) => {
const readyMessage: ReadyMessage = { ready: true }
parentPort!.postMessage(readyMessage)

port.start()
port.on('message', onMessage.bind(null, port, sharedBuffer))
atomicsWaitLoop(port, sharedBuffer)
})().catch(throwInNextTick)
Expand Down
36 changes: 19 additions & 17 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -658,31 +658,32 @@ class ThreadPool {
this.workers.onAvailable((w: WorkerInfo) => this._onWorkerAvailable(w))

this.startingUp = true
this._ensureMinimumWorkers()
this.startingUp = false
this._ensureMinimumWorkers().then(() => {
this.startingUp = false
})
}
_ensureEnoughWorkersForTaskQueue(): void {
async _ensureEnoughWorkersForTaskQueue(): Promise<void> {
while (
this.workers.size < this.taskQueue.size &&
this.workers.size < this.options.maxThreads
) {
this._addNewWorker()
await this._addNewWorker()
}
}

_ensureMaximumWorkers(): void {
async _ensureMaximumWorkers(): Promise<void> {
while (this.workers.size < this.options.maxThreads) {
this._addNewWorker()
await this._addNewWorker()
}
}

_ensureMinimumWorkers(): void {
async _ensureMinimumWorkers(): Promise<void> {
while (this.workers.size < this.options.minThreads) {
this._addNewWorker()
await this._addNewWorker()
}
}

_addNewWorker(): void {
async _addNewWorker(): Promise<void> {
const pool = this
const workerIds = this.workerIds

Expand All @@ -701,7 +702,7 @@ class ThreadPool {
? new ProcessWorker()
: new ThreadWorker()

worker.initialize({
await worker.initialize({
env: this.options.env,
argv: this.options.argv,
execArgv: this.options.execArgv,
Expand Down Expand Up @@ -740,6 +741,7 @@ class ThreadPool {
}

const { port1, port2 } = new MessageChannel()
port1.start()
const workerInfo = new WorkerInfo(
worker,
port1,
Expand Down Expand Up @@ -781,7 +783,7 @@ class ThreadPool {
)
})

worker.on('error', (err: Error) => {
worker.on('error', async (err: Error) => {
// Work around the bug in https://github.com/nodejs/node/pull/33394
worker.ref = () => {}

Expand All @@ -795,7 +797,7 @@ class ThreadPool {
this._removeWorker(workerInfo)

if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) {
this._ensureMinimumWorkers()
await this._ensureMinimumWorkers()
} else {
// Do not start new workers over and over if they already fail during
// bootstrap, there's no point.
Expand Down Expand Up @@ -881,7 +883,7 @@ class ThreadPool {
}
}

runTask(task: any, options: RunOptions): Promise<any> {
async runTask(task: any, options: RunOptions): Promise<any> {
let { filename, name } = options
const { transferList = [], signal = null, channel } = options

Expand Down Expand Up @@ -944,7 +946,7 @@ class ThreadPool {
if (taskInfo.workerInfo !== null) {
// Already running: We cancel the Worker this is running on.
this._removeWorker(taskInfo.workerInfo)
this._ensureMinimumWorkers()
void this._ensureMinimumWorkers()
} else {
// Not yet running: Remove it from the queue.
this.taskQueue.remove(taskInfo)
Expand All @@ -965,7 +967,7 @@ class ThreadPool {
}
} else {
if (this.workers.size < this.options.maxThreads) {
this._addNewWorker()
await this._addNewWorker()
}
this.taskQueue.push(taskInfo)
}
Expand All @@ -989,7 +991,7 @@ class ThreadPool {
(workerInfo === null || workerInfo.currentUsage() > 0) &&
this.workers.size < this.options.maxThreads
) {
this._addNewWorker()
await this._addNewWorker()
waitingForNewWorker = true
}

Expand Down Expand Up @@ -1099,7 +1101,7 @@ class ThreadPool {

await Promise.all(exitEvents)

this._ensureMinimumWorkers()
await this._ensureMinimumWorkers()
}
}

Expand Down
Loading