Skip to content

Commit 100b785

Browse files
committed
lib: child process queue pending messages
It fixes the problem for the child process not receiving messages. Fixes: #41134
1 parent 0d9f3bd commit 100b785

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

lib/internal/child_process.js

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const {
1010
ObjectDefineProperty,
1111
ObjectSetPrototypeOf,
1212
ReflectApply,
13+
SafeSet,
1314
StringPrototypeSlice,
1415
Symbol,
1516
Uint8Array,
@@ -81,6 +82,7 @@ let HTTPParser;
8182
const MAX_HANDLE_RETRANSMISSIONS = 3;
8283
const kChannelHandle = Symbol('kChannelHandle');
8384
const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');
85+
const kPendingMessages = Symbol('pendingMessages');
8486

8587
// This object contain function to convert TCP objects to native handle objects
8688
// and back again.
@@ -526,6 +528,7 @@ class Control extends EventEmitter {
526528
constructor(channel) {
527529
super();
528530
this.#channel = channel;
531+
this[kPendingMessages] = new SafeSet();
529532
}
530533

531534
// The methods keeping track of the counter are being used to track the
@@ -699,6 +702,19 @@ function setupChannel(target, channel, serializationMode) {
699702
});
700703
});
701704

705+
target.on('newListener', function() {
706+
if (!target.channel) return;
707+
708+
const messages = target.channel[kPendingMessages];
709+
if (!messages.size) return;
710+
711+
for (const messageParams of messages) {
712+
process.nextTick(() => ReflectApply(target.emit, target, messageParams));
713+
}
714+
715+
messages.clear();
716+
});
717+
702718
target.send = function(message, handle, options, callback) {
703719
if (typeof handle === 'function') {
704720
callback = handle;
@@ -912,7 +928,14 @@ function setupChannel(target, channel, serializationMode) {
912928
};
913929

914930
function emit(event, message, handle) {
915-
target.emit(event, message, handle);
931+
const isInternalMessage = 'internalMessage' === event;
932+
const hasListenersInstalled = target.listenerCount('message');
933+
if (hasListenersInstalled || isInternalMessage) {
934+
target.emit(event, message, handle);
935+
return;
936+
}
937+
938+
target.channel[kPendingMessages].add([event, message, handle]);
916939
}
917940

918941
function handleMessage(message, handle, internal) {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import '../common/index.mjs';
2+
import assert from 'assert';
3+
import { fork } from 'child_process';
4+
import { once } from 'events';
5+
6+
if (process.argv[2] !== 'child') {
7+
const currentFile = 'test-esm-child-process-fork-main.mjs';
8+
const { pathname: filename } = new URL(currentFile, import.meta.url);
9+
const cp = fork(filename, ['child']);
10+
const message = 'Hello World';
11+
cp.send(message);
12+
13+
const [received] = await once(cp, 'message');
14+
assert.deepStrictEqual(received, message);
15+
16+
cp.disconnect();
17+
} else {
18+
process.on('message', (msg) => process.send(msg));
19+
}

0 commit comments

Comments
 (0)