-
Notifications
You must be signed in to change notification settings - Fork 10.5k
[Concurrency] Implement basic runtime support for task futures. #34703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Concurrency] Implement basic runtime support for task futures. #34703
Conversation
/// wait queue and will be scheduled when the future completes or | ||
/// is cancelled. Otherwise, the future has completed and can be | ||
/// queried. | ||
FutureFragment::Status waitFuture(AsyncTask *waitingTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this more like pollFuture
or peekFutureStatus/peekStatus
?
wait to me implies waiting but we don't really do that, just check what we should do next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the future is still executing, it puts waitingTask
on the queue to be scheduled once the future has completed. We then get to yield the thread back to the scheduler to do more work (but don't have the way to do that yet)
stdlib/public/Concurrency/Task.cpp
Outdated
auto storagePtr = fragment->getStoragePtr(); | ||
|
||
// Check for an error. | ||
if (unsigned errorOffset = fragment->errorOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for my understanding... when would this be 0? or just general pattern to avoid a nullptr if somehow(?) it would happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be zero for a non-throwing function. We should probably turn that into a JobFlag
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, thanks! Yeah may be a job flag would be nicer.
@swift-ci please smoke test |
include/swift/ABI/Task.h
Outdated
unsigned resultOffset; | ||
|
||
/// The offset of the error in the initial asynchronous context. | ||
unsigned errorOffset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is me not knowing how to get a Swift function to put result values into the future.
stdlib/public/Concurrency/Task.cpp
Outdated
break; | ||
|
||
case Status::Error: | ||
swift_unknownObjectRelease(getStoragePtr()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not release an interior pointer into the future fragment. :)
Also, please use swift_errorRelease
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
swift_errorRelease
is only defined when Objective-C interoperability is enabled. But yeah, the other thing is bad ;)
stdlib/public/Concurrency/Task.cpp
Outdated
while (true) { | ||
waitingTask->NextWaitingTask = fragment->waitQueue.load(); | ||
if (fragment->waitQueue.compare_exchange_strong( | ||
waitingTask->NextWaitingTask, waitingTask)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason you should unify the two atomic fields is that you have a race here against someone concurrently completing the task. The thread that completes the task should exchange the appropriate completion value into the queue and then process the waiting tasks they just loaded (via the exchange). Here you need to loop trying to add the task to the queue until you see success.
Always use memory orderings when working with std::atomic
. This needs to be a std::memory_order_release
on success; it can be std::memory_order_relaxed
on failure, except actually it needs to be std::memory_order_acquire
because you're going to check for success/error.
Don't repeatedly reload waitQueue
. Load it (with an acquire) into a local, then use that local in the compare_exchange
. When you see a pointer, copy it into NextWaitingTask
(or really the SchedulerPrivate
contents).
You should use compare_exchange_weak
since you're already doing this in a loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I've made all of these changes together. It looks much more reasonable now.
stdlib/public/Concurrency/Task.cpp
Outdated
auto resultPtr = reinterpret_cast<OpaqueValue *>( | ||
reinterpret_cast<char *>(context) + fragment->resultOffset); | ||
fragment->resultType->vw_initializeWithTake(storagePtr, resultPtr); | ||
fragment->status = FutureFragment::Status::Success; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general ABI for indirect return values is that the address to fill would be passed as an argument, not that we'd make space for it in the initial context and that move out of that into the future.
I don't remember what convention we decided on for errors, but I doubt this it's this strange. Should there be a flag in the task reporting whether it can throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looking a lot better.
@swift-ci please test |
Build failed |
@swift-ci please smoke test |
include/swift/ABI/Task.h
Outdated
/// fragment. | ||
static size_t storageOffset(const Metadata *resultType) { | ||
size_t offset = sizeof(FutureFragment); | ||
size_t alignment = std::max(resultType->vw_alignment(), alignof(void *)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this max
about? The error value? Could you use SwiftError*
to communicate that, here and below?
...should I overlook that this is subtly pessimal in really implausible cases, like a highly-aligned byte? I'm not sure it can practically happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's the SwiftError*
. I can use that.
/// | ||
/// The low bits contain the status, the rest of the pointer is the | ||
/// AsyncTask. | ||
std::atomic<WaitQueueItem> waitQueue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically I don't think that tasks need to coincide with any status but Executing
, so the other statuses could be signal values rather than masked on. I don't think it really matters, though.
break; | ||
|
||
case Status::Error: | ||
swift_unknownObjectRelease(reinterpret_cast<OpaqueValue *>(getError())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the standard way we release errors in the runtime? Shouldn't we have an swift_errorRelease
we can unconditionally call, even if it just compiles to swift_release
in some configurations? Is it not important that we call swift_errorRelease
in the configurations that provide it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the standard way when we don't know whether we'll have a native Swift object or not. swift_errorRelease
is implemented as objc_release
where it's implemented.
stdlib/public/Concurrency/Task.cpp
Outdated
auto nextWaitingTask = waitingTask->getNextWaitingTask(); | ||
|
||
// Remove this task from the list. | ||
waitingTask->getNextWaitingTask() = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You actually don't have to do this; the memory is invalidated once it's not waiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright.
stdlib/public/Concurrency/Task.cpp
Outdated
JobFlags flags, AsyncTask *parent, const Metadata *futureResultType, | ||
AsyncFunctionType<void()> *function, size_t initialContextSize) { | ||
assert((futureResultType != nullptr) == flags.task_isFuture()); | ||
assert((futureResultType != nullptr) == flags.task_isFuture()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing undouble?
Should we assert here that initialContextSize >= sizeof(FutureAsyncContext)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea!
include/swift/ABI/Task.h
Outdated
/// task. | ||
/// | ||
/// This type matches the ABI of a function `<T> () async throws -> T`, which | ||
/// is used to describe futures. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be precise: it's the signature of the function taken by runDetached
and Task.Group.add
. We basically fully set up the call except for any formal arguments (e.g. closure-context arguments).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
@swift-ci please smoke test |
1b7fdea
to
5d38e0c
Compare
@swift-ci please smoke test |
This makes no sense...
|
@swift-ci please smoke test Linux |
Extend AsyncTask and the concurrency runtime with basic support for task futures. AsyncTasks with futures contain a future fragment with information about the type produced by the future, and where the future will put the result value or the thrown error in the initial context. We still don't have the ability to schedule the waiting tasks on an executor when the future completes, so this isn't useful for anything just test, and we can only test limited code paths.
Use a single atomic for the wait queue that combines the status with the first task in the queue. Address race conditions in waiting and completing the future. Thanks to John for setting the direction here for me.
Reduce the size of AsyncTask by using the first slot of SchedulerPrivate for the next waiting task. Thanks, John!
There isn't a big advantage to keeping "simple" task as a separate concept from a task. Tasks may or may not have futures.
Introduce `FutureAsyncContext` to line up with the async context formed by IR generation for the type `<T> () async throws -> T`. When allocating a future task, set up the context with the address of the future's storage for the successful result and null out the error result, so the caller will directly fill in the result. This eliminates a bunch of extra complexity and a copy.
…mber. 'const' breaks compilation on newer compilers.
The objectFuture test is better anyway.
5d38e0c
to
ede5aa3
Compare
@swift-ci please test |
Last commit should clear up the Windows failure. Rerunning all of the tests... |
Build failed |
Build failed |
@swift-ci pleas test |
Got tripped up by #34748, but otherwise everything was fine. |
@swift-ci please smoke test |
Extend AsyncTask and the concurrency runtime with basic support for
task futures. AsyncTasks with futures contain a future fragment with
information about the type produced by the future, and where the
future will put the result value or the thrown error in the initial
context.
We still don't have the ability to schedule the waiting tasks on an
executor when the future completes, so this isn't useful for anything
just test, and we can only test limited code paths.