File tree Expand file tree Collapse file tree 3 files changed +56
-0
lines changed Expand file tree Collapse file tree 3 files changed +56
-0
lines changed Original file line number Diff line number Diff line change @@ -609,11 +609,25 @@ void MessagePort::OnMessage() {
609
609
HandleScope handle_scope (env ()->isolate ());
610
610
Local<Context> context = object (env ()->isolate ())->CreationContext ();
611
611
612
+ ssize_t processing_limit;
613
+ {
614
+ Mutex::ScopedLock (data_->mutex_ );
615
+ processing_limit = data_->incoming_messages_ .size ();
616
+ }
617
+
612
618
// data_ can only ever be modified by the owner thread, so no need to lock.
613
619
// However, the message port may be transferred while it is processing
614
620
// messages, so we need to check that this handle still owns its `data_` field
615
621
// on every iteration.
616
622
while (data_) {
623
+ if (--processing_limit < 0 ) {
624
+ // Prevent event loop starvation by only processing those messages without
625
+ // interruption that were already present when the OnMessage() call was
626
+ // first triggered.
627
+ TriggerAsync ();
628
+ return ;
629
+ }
630
+
617
631
HandleScope handle_scope (env ()->isolate ());
618
632
Context::Scope context_scope (context);
619
633
Original file line number Diff line number Diff line change
1
+ 'use strict' ;
2
+ const common = require ( '../common' ) ;
3
+
4
+ const { MessageChannel } = require ( 'worker_threads' ) ;
5
+
6
+ // Make sure that closing a message port while receiving messages on it does
7
+ // not stop messages that are already in the queue from being emitted.
8
+
9
+ const { port1, port2 } = new MessageChannel ( ) ;
10
+
11
+ port1 . on ( 'message' , common . mustCall ( ( ) => {
12
+ port1 . close ( ) ;
13
+ } , 2 ) ) ;
14
+ port2 . postMessage ( 'foo' ) ;
15
+ port2 . postMessage ( 'bar' ) ;
Original file line number Diff line number Diff line change
1
+ 'use strict' ;
2
+ const common = require ( '../common' ) ;
3
+ const assert = require ( 'assert' ) ;
4
+
5
+ const { MessageChannel } = require ( 'worker_threads' ) ;
6
+
7
+ // Make sure that an infinite asynchronous .on('message')/postMessage loop
8
+ // does not lead to a stack overflow and does not starve the event loop.
9
+ // We schedule timeouts both from before the the .on('message') handler and
10
+ // inside of it, which both should run.
11
+
12
+ const { port1, port2 } = new MessageChannel ( ) ;
13
+ let count = 0 ;
14
+ port1 . on ( 'message' , ( ) => {
15
+ if ( count === 0 ) {
16
+ setTimeout ( common . mustCall ( ( ) => {
17
+ port1 . close ( ) ;
18
+ } ) , 0 ) ;
19
+ }
20
+
21
+ port2 . postMessage ( 0 ) ;
22
+ assert ( count ++ < 10000 , `hit ${ count } loop iterations` ) ;
23
+ } ) ;
24
+
25
+ port2 . postMessage ( 0 ) ;
26
+
27
+ setTimeout ( common . mustCall ( ) , 0 ) ;
You can’t perform that action at this time.
0 commit comments