diff --git a/include/swift/ABI/MetadataKind.def b/include/swift/ABI/MetadataKind.def index 27d7e93e37c5d..2fe58526ec7a3 100644 --- a/include/swift/ABI/MetadataKind.def +++ b/include/swift/ABI/MetadataKind.def @@ -85,8 +85,8 @@ METADATAKIND(HeapGenericLocalVariable, METADATAKIND(ErrorObject, 1 | MetadataKindIsNonType | MetadataKindIsRuntimePrivate) -/// A heap-allocated simple task. -METADATAKIND(SimpleTask, +/// A heap-allocated task. +METADATAKIND(Task, 2 | MetadataKindIsNonType | MetadataKindIsRuntimePrivate) // getEnumeratedMetadataKind assumes that all the enumerated values here diff --git a/include/swift/ABI/Task.h b/include/swift/ABI/Task.h index 0c5875230f6fd..43cc6df8169f4 100644 --- a/include/swift/ABI/Task.h +++ b/include/swift/ABI/Task.h @@ -19,6 +19,7 @@ #include "swift/Basic/RelativePointer.h" #include "swift/ABI/HeapObject.h" +#include "swift/ABI/Metadata.h" #include "swift/ABI/MetadataValues.h" #include "swift/Runtime/Config.h" #include "swift/Basic/STLExtras.h" @@ -29,6 +30,8 @@ class AsyncTask; class AsyncContext; class Executor; class Job; +struct OpaqueValue; +struct SwiftError; class TaskStatusRecord; /// An ExecutorRef isn't necessarily just a pointer to an executor @@ -86,6 +89,13 @@ class AsyncFunctionPointer { /// A schedulable job. class alignas(2 * alignof(void*)) Job { +protected: + // Indices into SchedulerPrivate, for use by the runtime. + enum { + /// The next waiting task link, an AsyncTask that is waiting on a future. + NextWaitingTaskIndex = 0, + }; + public: // Reserved for the use of the scheduler. void *SchedulerPrivate[2]; @@ -230,19 +240,142 @@ class AsyncTask : public HeapObject, public Job { } }; - bool isFuture() const { return Flags.task_isFuture(); } - bool hasChildFragment() const { return Flags.task_isChildTask(); } ChildFragment *childFragment() { assert(hasChildFragment()); return reinterpret_cast(this + 1); } - // TODO: Future fragment + class FutureFragment { + public: + /// Describes the status of the future. + /// + /// Futures always begin in the "Executing" state, and will always + /// make a single state change to either Success or Error. + enum class Status : uintptr_t { + /// The future is executing or ready to execute. The storage + /// is not accessible. + Executing = 0, + + /// The future has completed with result (of type \c resultType). + Success, + + /// The future has completed by throwing an error (an \c Error + /// existential). + Error, + }; + + /// An item within the wait queue, which includes the status and the + /// head of the list of tasks. + struct WaitQueueItem { + /// Mask used for the low status bits in a wait queue item. + static const uintptr_t statusMask = 0x03; + + uintptr_t storage; + + Status getStatus() const { + return static_cast(storage & statusMask); + } + + AsyncTask *getTask() const { + return reinterpret_cast(storage & ~statusMask); + } + + static WaitQueueItem get(Status status, AsyncTask *task) { + return WaitQueueItem{ + reinterpret_cast(task) | static_cast(status)}; + } + }; + + private: + /// Queue containing all of the tasks that are waiting in `get()`. + /// + /// The low bits contain the status, the rest of the pointer is the + /// AsyncTask. + std::atomic waitQueue; + + /// The type of the result that will be produced by the future. + const Metadata *resultType; + + // Trailing storage for the result itself. The storage will be uninitialized, + // contain an instance of \c resultType, or contaon an an \c Error. + + friend class AsyncTask; + + public: + explicit FutureFragment(const Metadata *resultType) + : waitQueue(WaitQueueItem::get(Status::Executing, nullptr)), + resultType(resultType) { } + + /// Destroy the storage associated with the future. + void destroy(); + + /// Retrieve a pointer to the storage of result. + OpaqueValue *getStoragePtr() { + return reinterpret_cast( + reinterpret_cast(this) + storageOffset(resultType)); + } + + /// Retrieve the error. + SwiftError *&getError() { + return *reinterpret_cast( + reinterpret_cast(this) + storageOffset(resultType)); + } + + /// Compute the offset of the storage from the base of the future + /// fragment. + static size_t storageOffset(const Metadata *resultType) { + size_t offset = sizeof(FutureFragment); + size_t alignment = + std::max(resultType->vw_alignment(), alignof(SwiftError *)); + return (offset + alignment - 1) & ~(alignment - 1); + } + + /// Determine the size of the future fragment given a particular future + /// result type. + static size_t fragmentSize(const Metadata *resultType) { + return storageOffset(resultType) + + std::max(resultType->vw_size(), sizeof(SwiftError *)); + } + }; + + bool isFuture() const { return Flags.task_isFuture(); } + + FutureFragment *futureFragment() { + assert(isFuture()); + if (hasChildFragment()) { + return reinterpret_cast( + reinterpret_cast(this + 1) + 1); + } + + return reinterpret_cast(this + 1); + } + + /// Wait for this future to complete. + /// + /// \returns the status of the future. If this result is + /// \c Executing, then \c waitingTask has been added to the + /// wait queue and will be scheduled when the future completes. Otherwise, + /// the future has completed and can be queried. + FutureFragment::Status waitFuture(AsyncTask *waitingTask); + + /// Complete this future. + /// + /// Upon completion, any waiting tasks will be scheduled on the given + /// executor. + void completeFuture(AsyncContext *context, ExecutorRef executor); static bool classof(const Job *job) { return job->isAsyncTask(); } + +private: + /// Access the next waiting task, which establishes a singly linked list of + /// tasks that are waiting on a future. + AsyncTask *&getNextWaitingTask() { + return reinterpret_cast( + SchedulerPrivate[NextWaitingTaskIndex]); + } }; // The compiler will eventually assume these. @@ -327,6 +460,20 @@ class YieldingAsyncContext : public AsyncContext { } }; +/// An asynchronous context within a task that describes a general "Future". +/// task. +/// +/// This type matches the ABI of a function ` () async throws -> T`, which +/// is the type used by `Task.runDetached` and `Task.group.add` to create +/// futures. +class FutureAsyncContext : public AsyncContext { +public: + SwiftError *errorResult = nullptr; + OpaqueValue *indirectResult; + + using AsyncContext::AsyncContext; +}; + } // end namespace swift #endif diff --git a/include/swift/Runtime/Concurrency.h b/include/swift/Runtime/Concurrency.h index cbad27789603b..df8162d0e3530 100644 --- a/include/swift/Runtime/Concurrency.h +++ b/include/swift/Runtime/Concurrency.h @@ -49,6 +49,31 @@ AsyncTaskAndContext swift_task_create_f(JobFlags flags, AsyncFunctionType *function, size_t initialContextSize); +/// Create a task object with a future which will run the given +/// function. +/// +/// The task is not yet scheduled. +/// +/// If a parent task is provided, flags.task_hasChildFragment() must +/// be true, and this must be called synchronously with the parent. +/// The parent is responsible for creating a ChildTaskStatusRecord. +/// TODO: should we have a single runtime function for creating a task +/// and doing this child task status record management? +/// +/// flags.task_isFuture must be set. \c futureResultType is the type +/// +SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) +AsyncTaskAndContext swift_task_create_future( + JobFlags flags, AsyncTask *parent, const Metadata *futureResultType, + const AsyncFunctionPointer *function); + +/// Create a task object with a future which will run the given +/// function. +SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) +AsyncTaskAndContext swift_task_create_future_f( + JobFlags flags, AsyncTask *parent, const Metadata *futureResultType, + AsyncFunctionType *function, size_t initialContextSize); + /// Allocate memory in a task. /// /// This must be called synchronously with the task. @@ -83,6 +108,34 @@ SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) JobPriority swift_task_escalate(AsyncTask *task, JobPriority newPriority); +/// The result of waiting for a task future. +struct TaskFutureWaitResult { + enum Kind : uintptr_t { + /// The waiting task has been added to the future's wait queue, and will + /// be scheduled once the future has completed. + Waiting, + + /// The future succeeded and produced a result value. \c storage points + /// at that value. + Success, + + /// The future finished by throwing an error. \c storage is that error + /// existential. + Error, + }; + + Kind kind; + OpaqueValue *storage; +}; + +/// Wait for a future task to complete. +/// +/// This can be called from any thread. +/// +SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) +TaskFutureWaitResult +swift_task_future_wait(AsyncTask *task, AsyncTask *waitingTask); + /// Add a status record to a task. The record should not be /// modified while it is registered with a task. /// diff --git a/stdlib/public/Concurrency/Task.cpp b/stdlib/public/Concurrency/Task.cpp index dbf81c3383b11..c2e5464204df3 100644 --- a/stdlib/public/Concurrency/Task.cpp +++ b/stdlib/public/Concurrency/Task.cpp @@ -21,12 +21,105 @@ #include "TaskPrivate.h" using namespace swift; +using FutureFragment = AsyncTask::FutureFragment; + +void FutureFragment::destroy() { + auto queueHead = waitQueue.load(std::memory_order_acquire); + switch (queueHead.getStatus()) { + case Status::Executing: + assert(false && "destroying a task that never completed"); + + case Status::Success: + resultType->vw_destroy(getStoragePtr()); + break; + + case Status::Error: + swift_unknownObjectRelease(reinterpret_cast(getError())); + break; + } +} + +FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask) { + using Status = FutureFragment::Status; + using WaitQueueItem = FutureFragment::WaitQueueItem; + + assert(isFuture()); + auto fragment = futureFragment(); + + auto queueHead = fragment->waitQueue.load(std::memory_order_acquire); + while (true) { + switch (queueHead.getStatus()) { + case Status::Error: + case Status::Success: + // The task is done; we don't need to wait. + return queueHead.getStatus(); + + case Status::Executing: + // Task is now complete. We'll need to add ourselves to the queue. + break; + } + + // Put the waiting task at the beginning of the wait queue. + waitingTask->getNextWaitingTask() = queueHead.getTask(); + auto newQueueHead = WaitQueueItem::get(Status::Executing, waitingTask); + if (fragment->waitQueue.compare_exchange_weak( + queueHead, newQueueHead, std::memory_order_release, + std::memory_order_acquire)) { + // Escalate the priority of this task based on the priority + // of the waiting task. + swift_task_escalate(this, waitingTask->Flags.getPriority()); + return FutureFragment::Status::Executing; + } + } +} + +void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) { + using Status = FutureFragment::Status; + using WaitQueueItem = FutureFragment::WaitQueueItem; + + assert(isFuture()); + auto fragment = futureFragment(); + + // If an error was thrown, save it in the future fragment. + auto futureContext = static_cast(context); + bool hadErrorResult = false; + if (auto errorObject = futureContext->errorResult) { + fragment->getError() = errorObject; + hadErrorResult = true; + } + + // Update the status to signal completion. + auto newQueueHead = WaitQueueItem::get( + hadErrorResult ? Status::Error : Status::Success, + nullptr + ); + auto queueHead = fragment->waitQueue.exchange( + newQueueHead, std::memory_order_acquire); + assert(queueHead.getStatus() == Status::Executing); + + // Schedule every waiting task on the executor. + auto waitingTask = queueHead.getTask(); + while (waitingTask) { + // Find the next waiting task. + auto nextWaitingTask = waitingTask->getNextWaitingTask(); + + // TODO: schedule this task on the executor rather than running it + // directly. + waitingTask->run(executor); + + // Move to the next task. + waitingTask = nextWaitingTask; + } +} SWIFT_CC(swift) -static void destroySimpleTask(SWIFT_CONTEXT HeapObject *_obj) { - auto obj = static_cast(_obj); +static void destroyTask(SWIFT_CONTEXT HeapObject *obj) { + auto task = static_cast(obj); - assert(!obj->isFuture()); + // For a future, destroy the result. + if (task->isFuture()) { + task->futureFragment()->destroy(); + } // The task execution itself should always hold a reference to it, so // if we get here, we know the task has finished running, which means @@ -34,27 +127,26 @@ static void destroySimpleTask(SWIFT_CONTEXT HeapObject *_obj) { // the task-local allocator. There's actually nothing else to clean up // here. - free(obj); + free(task); } -/// Heap metadata for a simple asynchronous task that does not -/// include a future. -static FullMetadata simpleTaskHeapMetadata = { +/// Heap metadata for an asynchronous task. +static FullMetadata taskHeapMetadata = { { { - &destroySimpleTask + &destroyTask }, { /*value witness table*/ nullptr } }, { - MetadataKind::SimpleTask + MetadataKind::Task } }; /// The function that we put in the context of a simple task -/// (one with no future) to handle the final return. +/// to handle the final return. SWIFT_CC(swift) static void completeTask(AsyncTask *task, ExecutorRef executor, AsyncContext *context) { @@ -62,10 +154,14 @@ static void completeTask(AsyncTask *task, ExecutorRef executor, // to wait for the object to be destroyed. _swift_task_alloc_destroy(task); + // Complete the future. + if (task->isFuture()) { + task->completeFuture(context, executor); + } + // TODO: set something in the status? // TODO: notify the parent somehow? // TODO: remove this task from the child-task chain? - // TODO: notify tasks waiting on the future? // Release the task, balancing the retain that a running task // has on itself. @@ -83,13 +179,36 @@ AsyncTaskAndContext swift::swift_task_create_f(JobFlags flags, AsyncTask *parent, AsyncFunctionType *function, size_t initialContextSize) { - assert(!flags.task_isFuture() && "function doesn't support creating futures"); + return swift_task_create_future_f( + flags, parent, nullptr, function, initialContextSize); +} + +AsyncTaskAndContext swift::swift_task_create_future( + JobFlags flags, AsyncTask *parent, const Metadata *futureResultType, + const AsyncFunctionPointer *function) { + return swift_task_create_future_f( + flags, parent, futureResultType, function->Function.get(), + function->ExpectedContextSize); +} + +AsyncTaskAndContext swift::swift_task_create_future_f( + JobFlags flags, AsyncTask *parent, const Metadata *futureResultType, + AsyncFunctionType *function, size_t initialContextSize) { + assert((futureResultType != nullptr) == flags.task_isFuture()); + assert(!flags.task_isFuture() || + initialContextSize >= sizeof(FutureAsyncContext)); assert((parent != nullptr) == flags.task_isChildTask()); // Figure out the size of the header. size_t headerSize = sizeof(AsyncTask); if (parent) headerSize += sizeof(AsyncTask::ChildFragment); + if (futureResultType) { + headerSize += FutureFragment::fragmentSize(futureResultType); + } + + headerSize = llvm::alignTo(headerSize, llvm::Align(alignof(AsyncContext))); + // Allocate the initial context together with the job. // This means that we never get rid of this allocation. size_t amountToAllocate = headerSize + initialContextSize; @@ -108,7 +227,7 @@ swift::swift_task_create_f(JobFlags flags, AsyncTask *parent, // Initialize the task so that resuming it will run the given // function on the initial context. AsyncTask *task = - new(allocation) AsyncTask(&simpleTaskHeapMetadata, flags, + new(allocation) AsyncTask(&taskHeapMetadata, flags, function, initialContext); // Initialize the child fragment if applicable. @@ -118,6 +237,18 @@ swift::swift_task_create_f(JobFlags flags, AsyncTask *parent, new (childFragment) AsyncTask::ChildFragment(parent); } + // Initialize the future fragment if applicable. + if (futureResultType) { + auto futureFragment = task->futureFragment(); + new (futureFragment) FutureFragment(futureResultType); + + // Set up the context for the future so there is no error, and a successful + // result will be written into the future fragment's storage. + auto futureContext = static_cast(initialContext); + futureContext->errorResult = nullptr; + futureContext->indirectResult = futureFragment->getStoragePtr(); + } + // Configure the initial context. // // FIXME: if we store a null pointer here using the standard ABI for @@ -136,6 +267,24 @@ swift::swift_task_create_f(JobFlags flags, AsyncTask *parent, return {task, initialContext}; } +TaskFutureWaitResult +swift::swift_task_future_wait(AsyncTask *task, AsyncTask *waitingTask) { + assert(task->isFuture()); + switch (task->waitFuture(waitingTask)) { + case FutureFragment::Status::Executing: + return TaskFutureWaitResult{TaskFutureWaitResult::Waiting, nullptr}; + + case FutureFragment::Status::Success: + return TaskFutureWaitResult{ + TaskFutureWaitResult::Success, task->futureFragment()->getStoragePtr()}; + + case FutureFragment::Status::Error: + return TaskFutureWaitResult{ + TaskFutureWaitResult::Error, + reinterpret_cast(task->futureFragment()->getError())}; + } +} + // TODO: Remove this hack. void swift::swift_task_run(AsyncTask *taskToRun) { taskToRun->run(ExecutorRef::noPreference()); diff --git a/unittests/runtime/CMakeLists.txt b/unittests/runtime/CMakeLists.txt index 5070c96876713..4ed374290c03b 100644 --- a/unittests/runtime/CMakeLists.txt +++ b/unittests/runtime/CMakeLists.txt @@ -55,6 +55,7 @@ if(("${SWIFT_HOST_VARIANT_SDK}" STREQUAL "${SWIFT_PRIMARY_VARIANT_SDK}") AND if(SWIFT_ENABLE_EXPERIMENTAL_CONCURRENCY) list(APPEND PLATFORM_SOURCES + TaskFuture.cpp TaskStatus.cpp ) list(APPEND PLATFORM_TARGET_LINK_LIBRARIES @@ -66,6 +67,7 @@ if(("${SWIFT_HOST_VARIANT_SDK}" STREQUAL "${SWIFT_PRIMARY_VARIANT_SDK}") AND set(LLVM_OPTIONAL_SOURCES weak.mm Refcounting.mm + TaskFuture.cpp TaskStatus.cpp) add_swift_unittest(SwiftRuntimeTests diff --git a/unittests/runtime/TaskFuture.cpp b/unittests/runtime/TaskFuture.cpp new file mode 100644 index 0000000000000..a4d004caf8bed --- /dev/null +++ b/unittests/runtime/TaskFuture.cpp @@ -0,0 +1,165 @@ +//===--- TaskFuture.cpp - Unit tests for the task futures API -------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +#include "swift/Runtime/Concurrency.h" +#include "swift/Runtime/Metadata.h" +#include "swift/Demangling/ManglingMacros.h" +#include "swift/Basic/STLExtras.h" +#include "gtest/gtest.h" + +using namespace swift; + +namespace { +template struct FutureContext; + +template +using InvokeFunctionRef = + llvm::function_ref *context)>; + +using BodyFunctionRef = + llvm::function_ref; + +template struct FutureContext : FutureAsyncContext { + InvokeFunctionRef storedInvokeFn; + + Storage& getStorage() { + return *reinterpret_cast(this->indirectResult); + } +}; + +// Disable template argument deduction. +template +using undeduced = + typename std::enable_if::value, T>::type; + +template +SWIFT_CC(swift) +static void futureTaskInvokeFunction(AsyncTask *task, ExecutorRef executor, + AsyncContext *context) { + auto futureContext = static_cast*>(context); + futureContext->storedInvokeFn(task, executor, futureContext); + + // Return to finish off the task. + // In a normal situation we'd need to free the context, but here + // we know we're at the top level. + futureContext->ResumeParent(task, executor, futureContext); +} + +template +static void withFutureTask(const Metadata *resultType, + const T& initialValue, + undeduced> invokeFn, + BodyFunctionRef body) { + JobFlags flags = JobKind::Task; + flags.task_setIsFuture(true); + + auto taskAndContext = + swift_task_create_future_f(flags, /*parent*/ nullptr, resultType, + &futureTaskInvokeFunction, + sizeof(FutureContext)); + + auto futureContext = + static_cast*>(taskAndContext.InitialContext); + futureContext->getStorage() = initialValue; // Magic number. + futureContext->storedInvokeFn = invokeFn; + + // Forward our owning reference to the task into its execution, + // causing it to be destroyed when it completes. + body(taskAndContext.Task); +} + +static ExecutorRef createFakeExecutor(uintptr_t value) { + return {reinterpret_cast(value)}; +} +} + +extern const FullMetadata METADATA_SYM(Si); + +struct TestObject : HeapObject { + constexpr TestObject(HeapMetadata const *newMetadata) + : HeapObject(newMetadata, InlineRefCounts::Immortal) + , Addr(NULL), Value(0) {} + + size_t *Addr; + size_t Value; +}; + +static SWIFT_CC(swift) void destroyTestObject(SWIFT_CONTEXT HeapObject *_object) { + auto object = static_cast(_object); + assert(object->Addr && "object already deallocated"); + *object->Addr = object->Value; + object->Addr = nullptr; + swift_deallocObject(object, sizeof(TestObject), alignof(TestObject) - 1); +} + +static const FullMetadata TestClassObjectMetadata = { + { { &destroyTestObject }, { &VALUE_WITNESS_SYM(Bo) } }, + { { nullptr }, ClassFlags::UsesSwiftRefcounting, 0, 0, 0, 0, 0, 0 } +}; + +/// Create an object that, when deallocated, stores the given value to +/// the given pointer. +static TestObject *allocTestObject(size_t *addr, size_t value) { + auto result = + static_cast(swift_allocObject(&TestClassObjectMetadata, + sizeof(TestObject), + alignof(TestObject) - 1)); + result->Addr = addr; + result->Value = value; + return result; +} + +TEST(TaskFutureTest, objectFuture) { + auto createdExecutor = createFakeExecutor(1234); + bool hasRun = false; + + size_t objectValueOnComplete = 7; + TestObject *object = nullptr; + withFutureTask( + &TestClassObjectMetadata, nullptr, + [&](AsyncTask *task, ExecutorRef executor, + FutureContext *context) { + object = allocTestObject(&objectValueOnComplete, 25); + + // The error storage should have been cleared out for us. + EXPECT_EQ(nullptr, context->errorResult); + + // Store the object in the future. + context->getStorage() = object; + + hasRun = true; + }, [&](AsyncTask *task) { + // Retain the task, so it won't be destroyed when it is executed. + swift_retain(task); + + // Run the task, which should fill in the future. + EXPECT_FALSE(hasRun); + task->run(createdExecutor); + EXPECT_TRUE(hasRun); + + // "Wait" for the future, which must have completed by now. + auto waitResult = swift_task_future_wait(task, nullptr); + EXPECT_EQ(TaskFutureWaitResult::Success, waitResult.kind); + + // Make sure we got the result value we expect. + EXPECT_EQ(object, *reinterpret_cast(waitResult.storage)); + + // Make sure the object hasn't been destroyed. + EXPECT_EQ(size_t(7), objectValueOnComplete); + + // Okay, release the task. This should destroy the object. + swift_release(task); + assert(objectValueOnComplete == 25); + }); +}