diff --git a/example/.babelrc b/example/.babelrc index fa5c8e6..9a30d23 100644 --- a/example/.babelrc +++ b/example/.babelrc @@ -5,7 +5,7 @@ { "useBuiltIns": true, "targets": { - "node": 4.3 + "node": "4.3.0" }, "exclude": [ "transform-async-to-generator", diff --git a/example/webpack.config.js b/example/webpack.config.js index 0cc133e..2e8d29b 100644 --- a/example/webpack.config.js +++ b/example/webpack.config.js @@ -1,6 +1,6 @@ const path = require('path'); const ExtractTextPlugin = require('extract-text-webpack-plugin'); // eslint-disable-line import/no-extraneous-dependencies -const threadLoader = require('thread-loader'); // eslint-disable-line import/no-extraneous-dependencies +// const threadLoader = require('thread-loader'); // eslint-disable-line import/no-extraneous-dependencies module.exports = (env) => { const workerPool = { @@ -12,10 +12,12 @@ module.exports = (env) => { workerParallelJobs: 2, poolTimeout: env.watch ? Infinity : 2000, }; - if (+env.threads > 0) { - threadLoader.warmup(workerPool, ['babel-loader', 'babel-preset-env']); - threadLoader.warmup(workerPoolSass, ['sass-loader', 'css-loader']); - } + + // TODO: fix me + // if (+env.threads > 0) { + // threadLoader.warmup(workerPool, ['babel-loader', 'babel-preset-env']); + // threadLoader.warmup(workerPoolSass, ['sass-loader', 'css-loader']); + // } return { context: __dirname, entry: ['react', 'lodash-es', './index.js'], @@ -39,7 +41,7 @@ module.exports = (env) => { test: /\.scss$/, use: ExtractTextPlugin.extract({ use: [ - env.threads !== 0 && { + false && env.threads !== 0 && { loader: 'thread-loader', options: workerPoolSass, }, diff --git a/src/WorkerPool2.js b/src/WorkerPool2.js new file mode 100644 index 0000000..793597d --- /dev/null +++ b/src/WorkerPool2.js @@ -0,0 +1,211 @@ +/* eslint-disable no-console */ +// eslint-disable-next-line +import { Worker } from 'worker_threads'; +import asyncQueue from 'async/queue'; +import WorkerError from './WorkerError'; + +const workerPath = require.resolve('./worker2'); + +let workerId = 0; + +class PoolWorker { + constructor(options, onJobDone) { + this.nextJobId = 0; + this.jobs = Object.create(null); + this.activeJobs = 0; + this.onJobDone = onJobDone; + this.id = workerId; + workerId += 1; + this.worker = new Worker(workerPath, { + workerData: { + nodeArgs: options.nodeArgs, + parallelJobs: options.parallelJobs, + }, + }); + + this.worker.on('message', this.onWorkerMessage.bind(this)); + this.worker.on('error', console.error); + } + + run(data, callback) { + const jobId = this.nextJobId; + this.nextJobId += 1; + this.jobs[jobId] = { data, callback }; + this.activeJobs += 1; + this.writeJson({ + type: 'job', + id: jobId, + data, + }); + } + + warmup(requires) { + this.writeJson({ + type: 'warmup', + requires, + }); + } + + writeJson(data) { + const message = { + id: data.id, + type: data.type, + data: { + loaders: data.data.loaders, + resource: data.data.resource, + optionsContext: data.data.optionsContext, + sourceMap: data.data.sourceMap, + }, + }; + + this.worker.postMessage(message); + } + + onWorkerMessage(message) { + const { type, id } = message; + switch (type) { + case 'job': { + const { error, result } = message; + const { callback: jobCallback } = this.jobs[id]; + const callback = (err, arg) => { + if (jobCallback) { + delete this.jobs[id]; + this.activeJobs -= 1; + this.onJobDone(); + if (err) { + jobCallback(err instanceof Error ? err : new Error(err), arg); + } else { + jobCallback(null, arg); + } + } + }; + + if (error) { + callback(this.fromErrorObj(error), result); + return; + } + + callback(null, result); + break; + } + case 'resolve': { + const { context, request, questionId } = message; + const { data } = this.jobs[id]; + data.resolve(context, request, (error, result) => { + this.writeJson({ + type: 'result', + id: questionId, + error: error ? { + message: error.message, + details: error.details, + missing: error.missing, + } : null, + result, + }); + }); + break; + } + case 'emitWarning': { + const { data } = message; + const { data: jobData } = this.jobs[id]; + jobData.emitWarning(this.fromErrorObj(data)); + break; + } + case 'emitError': { + const { data } = message; + const { data: jobData } = this.jobs[id]; + jobData.emitError(this.fromErrorObj(data)); + break; + } + default: { + console.error(`Unexpected worker message ${type} in WorkerPool.`); + break; + } + } + } + + fromErrorObj(arg) { + let obj; + if (typeof arg === 'string') { + obj = { message: arg }; + } else { + obj = arg; + } + return new WorkerError(obj, this.id); + } + + dispose() { + this.worker.terminate(); + } +} + +export default class WorkerPool { + constructor(options) { + this.options = options || {}; + this.numberOfWorkers = options.numberOfWorkers; + this.poolTimeout = options.poolTimeout; + this.workerNodeArgs = options.workerNodeArgs; + this.workerParallelJobs = options.workerParallelJobs; + this.workers = new Set(); + this.activeJobs = 0; + this.timeout = null; + this.poolQueue = asyncQueue(this.distributeJob.bind(this), options.poolParallelJobs); + } + + run(data, callback) { + if (this.timeout) { + clearTimeout(this.timeout); + this.timeout = null; + } + this.activeJobs += 1; + this.poolQueue.push(data, callback); + } + + distributeJob(data, callback) { + // use worker with the fewest jobs + let bestWorker; + for (const worker of this.workers) { + if (!bestWorker || worker.activeJobs < bestWorker.activeJobs) { + bestWorker = worker; + } + } + if (bestWorker && (bestWorker.activeJobs === 0 || this.workers.size >= this.numberOfWorkers)) { + bestWorker.run(data, callback); + return; + } + const newWorker = this.createWorker(); + newWorker.run(data, callback); + } + + createWorker() { + // spin up a new worker + const newWorker = new PoolWorker({ + nodeArgs: this.workerNodeArgs, + parallelJobs: this.workerParallelJobs, + }, () => this.onJobDone()); + this.workers.add(newWorker); + return newWorker; + } + + warmup(requires) { + while (this.workers.size < this.numberOfWorkers) { + this.createWorker().warmup(requires); + } + } + + onJobDone() { + this.activeJobs -= 1; + if (this.activeJobs === 0 && isFinite(this.poolTimeout)) { + this.timeout = setTimeout(() => this.disposeWorkers(), this.poolTimeout); + } + } + + disposeWorkers() { + if (this.activeJobs === 0) { + for (const worker of this.workers) { + worker.dispose(); + } + this.workers.clear(); + } + } +} diff --git a/src/worker2.js b/src/worker2.js new file mode 100644 index 0000000..fc5d9eb --- /dev/null +++ b/src/worker2.js @@ -0,0 +1,151 @@ +/* global require */ +/* eslint-disable no-console */ +import fs from 'fs'; +import NativeModule from 'module'; +// eslint-disable-next-line +import { parentPort, workerData } from 'worker_threads'; +import loaderRunner from 'loader-runner'; +import asyncQueue from 'async/queue'; + + +parentPort.on('message', onMessage); + +function writeJson(data) { + parentPort.postMessage(data); +} + +const PARALLEL_JOBS = +workerData.parallelJobs; + +let nextQuestionId = 0; +const callbackMap = Object.create(null); + +function toErrorObj(err) { + return { + message: err.message, + details: err.details, + stack: err.stack, + hideStack: err.hideStack, + }; +} + +function toNativeError(obj) { + if (!obj) return null; + const err = new Error(obj.message); + err.details = obj.details; + err.missing = obj.missing; + return err; +} + +const queue = asyncQueue(({ id, data }, taskCallback) => { + try { + loaderRunner.runLoaders({ + loaders: data.loaders, + resource: data.resource, + readResource: fs.readFile.bind(fs), + context: { + version: 2, + resolve: (context, request, callback) => { + callbackMap[nextQuestionId] = callback; + writeJson({ + type: 'resolve', + id, + questionId: nextQuestionId, + context, + request, + }); + nextQuestionId += 1; + }, + emitWarning: (warning) => { + writeJson({ + type: 'emitWarning', + id, + data: toErrorObj(warning), + }); + }, + emitError: (error) => { + writeJson({ + type: 'emitError', + id, + data: toErrorObj(error), + }); + }, + exec: (code, filename) => { + const module = new NativeModule(filename, this); + module.paths = NativeModule._nodeModulePaths(this.context); // eslint-disable-line no-underscore-dangle + module.filename = filename; + module._compile(code, filename); // eslint-disable-line no-underscore-dangle + return module.exports; + }, + options: { + context: data.optionsContext, + }, + webpack: true, + 'thread-loader': true, + sourceMap: data.sourceMap, + }, + }, (err, lrResult) => { + const { + result, + cacheable, + fileDependencies, + contextDependencies, + } = lrResult; + writeJson({ + type: 'job', + id, + error: err && toErrorObj(err), + result: { + result, + cacheable, + fileDependencies, + contextDependencies, + }, + data: result, + }); + taskCallback(); + }); + } catch (e) { + writeJson({ + type: 'job', + id, + error: toErrorObj(e), + }); + taskCallback(); + } +}, PARALLEL_JOBS); + +function onMessage(message) { + try { + const { type, id } = message; + switch (type) { + case 'job': { + queue.push(message); + break; + } + case 'result': { + const { error, result } = message; + const callback = callbackMap[id]; + if (callback) { + const nativeError = toNativeError(error); + callback(nativeError, result); + } else { + console.error(`Worker got unexpected result id ${id}`); + } + delete callbackMap[id]; + break; + } + case 'warmup': { + const { requires } = message; + // load modules into process + requires.forEach(r => require(r)); // eslint-disable-line import/no-dynamic-require, global-require + break; + } + default: { + console.error(`Worker got unexpected job type ${type}`); + break; + } + } + } catch (e) { + console.error(`Error in worker ${e}`); + } +} diff --git a/src/workerPools.js b/src/workerPools.js index 3a81b7a..6cba813 100644 --- a/src/workerPools.js +++ b/src/workerPools.js @@ -1,5 +1,5 @@ import os from 'os'; -import WorkerPool from './WorkerPool'; +import WorkerPool from './WorkerPool2'; const workerPools = Object.create(null);