Skip to content

[Concurrency] Fix crash from calling fromTaskExecutorPreference after enqueueing job. #76757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1355,20 +1355,31 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
}
}

// Fetch the task executor from the job for later use. This can be somewhat
// expensive, so only do it if we're likely to need it. The conditions here
// match the conditions of the if statements below which use `taskExecutor`.
TaskExecutorRef taskExecutor;
bool needsScheduling = !oldState.isScheduled() && newState.isScheduled();
bool needsStealer =
oldState.getMaxPriority() != newState.getMaxPriority() &&
newState.isRunning();
if (needsScheduling || needsStealer)
taskExecutor = TaskExecutorRef::fromTaskExecutorPreference(job);

// This needs to be a store release so that we also publish the contents of
// the new Job we are adding to the atomic job queue. Pairs with consume
// in drainOne.
if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_release,
/* failure */ std::memory_order_relaxed)) {
// NOTE: `job` is off limits after this point, as another thread might run
// and destroy it now that it's enqueued.

traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);

if (!oldState.isScheduled() && newState.isScheduled()) {
// We took responsibility to schedule the actor for the first time. See
// also ownership rule (1)
TaskExecutorRef taskExecutor =
TaskExecutorRef::fromTaskExecutorPreference(job);

return scheduleActorProcessJob(newState.getMaxPriority(), taskExecutor);
}

Expand All @@ -1389,8 +1400,6 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
this, newState.getMaxPriority());
swift_retain(this);

TaskExecutorRef taskExecutor =
TaskExecutorRef::fromTaskExecutorPreference(job);
scheduleActorProcessJob(newState.getMaxPriority(), taskExecutor);
}
}
Expand Down Expand Up @@ -1446,9 +1455,19 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
if (oldState == newState)
return;

// Fetch the task executor from the job for later use. This can be somewhat
// expensive, so only do it if we're likely to need it. The conditions here
// match the conditions of the if statements below which use `taskExecutor`.
TaskExecutorRef taskExecutor;
if (!newState.isRunning() && newState.isScheduled())
taskExecutor = TaskExecutorRef::fromTaskExecutorPreference(job);

if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_relaxed,
/* failure */ std::memory_order_relaxed)) {
// NOTE: `job` is off limits after this point, as another thread might run
// and destroy it now that it's enqueued.

traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
if (newState.isRunning()) {
Expand All @@ -1466,7 +1485,6 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
"[Override] Scheduling a stealer for actor %p at %#x priority",
this, newState.getMaxPriority());
swift_retain(this);
auto taskExecutor = TaskExecutorRef::fromTaskExecutorPreference(job);
scheduleActorProcessJob(newState.getMaxPriority(), taskExecutor);
}
#endif
Expand Down