Skip to content

Commit 6299957

Browse files
hymmcart
authored andcommitted
close the channel so executor doesn't deadlock
1 parent e1d741a commit 6299957

File tree

1 file changed

+46
-21
lines changed

1 file changed

+46
-21
lines changed

crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::panic::AssertUnwindSafe;
2+
13
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
24
use bevy_utils::default;
35
use bevy_utils::syncunsafecell::SyncUnsafeCell;
@@ -175,11 +177,10 @@ impl SystemExecutor for MultiThreadedExecutor {
175177

176178
if self.num_running_systems > 0 {
177179
// wait for systems to complete
178-
let index = self
179-
.receiver
180-
.recv()
181-
.await
182-
.unwrap_or_else(|error| unreachable!("{}", error));
180+
let index =
181+
self.receiver.recv().await.expect(
182+
"A system has panicked so the executor cannot continue.",
183+
);
183184

184185
self.finish_system_and_signal_dependents(index);
185186

@@ -429,14 +430,22 @@ impl MultiThreadedExecutor {
429430
let task = async move {
430431
#[cfg(feature = "trace")]
431432
let system_guard = system_span.enter();
432-
// SAFETY: access is compatible
433-
unsafe { system.run_unsafe((), world) };
433+
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
434+
// SAFETY: access is compatible
435+
unsafe { system.run_unsafe((), world) };
436+
}));
434437
#[cfg(feature = "trace")]
435438
drop(system_guard);
436-
sender
437-
.send(system_index)
438-
.await
439-
.unwrap_or_else(|error| unreachable!("{}", error));
439+
if res.is_err() {
440+
// close the channel to propagate the error to the
441+
// multithreaded executor
442+
sender.close();
443+
} else {
444+
sender
445+
.send(system_index)
446+
.await
447+
.unwrap_or_else(|error| unreachable!("{}", error));
448+
}
440449
};
441450

442451
#[cfg(feature = "trace")]
@@ -479,13 +488,21 @@ impl MultiThreadedExecutor {
479488
let task = async move {
480489
#[cfg(feature = "trace")]
481490
let system_guard = system_span.enter();
482-
apply_system_buffers(&unapplied_systems, systems, world);
491+
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
492+
apply_system_buffers(&unapplied_systems, systems, world);
493+
}));
483494
#[cfg(feature = "trace")]
484495
drop(system_guard);
485-
sender
486-
.send(system_index)
487-
.await
488-
.unwrap_or_else(|error| unreachable!("{}", error));
496+
if res.is_err() {
497+
// close the channel to propagate the error to the
498+
// multithreaded executor
499+
sender.close();
500+
} else {
501+
sender
502+
.send(system_index)
503+
.await
504+
.unwrap_or_else(|error| unreachable!("{}", error));
505+
}
489506
};
490507

491508
#[cfg(feature = "trace")]
@@ -495,13 +512,21 @@ impl MultiThreadedExecutor {
495512
let task = async move {
496513
#[cfg(feature = "trace")]
497514
let system_guard = system_span.enter();
498-
system.run((), world);
515+
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
516+
system.run((), world);
517+
}));
499518
#[cfg(feature = "trace")]
500519
drop(system_guard);
501-
sender
502-
.send(system_index)
503-
.await
504-
.unwrap_or_else(|error| unreachable!("{}", error));
520+
if res.is_err() {
521+
// close the channel to propagate the error to the
522+
// multithreaded executor
523+
sender.close();
524+
} else {
525+
sender
526+
.send(system_index)
527+
.await
528+
.unwrap_or_else(|error| unreachable!("{}", error));
529+
}
505530
};
506531

507532
#[cfg(feature = "trace")]

0 commit comments

Comments
 (0)