Skip to content

[Concurrency] Implement basic runtime support for task futures. #34703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/swift/ABI/MetadataKind.def
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ METADATAKIND(HeapGenericLocalVariable,
METADATAKIND(ErrorObject,
1 | MetadataKindIsNonType | MetadataKindIsRuntimePrivate)

/// A heap-allocated simple task.
METADATAKIND(SimpleTask,
/// A heap-allocated task.
METADATAKIND(Task,
2 | MetadataKindIsNonType | MetadataKindIsRuntimePrivate)

// getEnumeratedMetadataKind assumes that all the enumerated values here
Expand Down
153 changes: 150 additions & 3 deletions include/swift/ABI/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "swift/Basic/RelativePointer.h"
#include "swift/ABI/HeapObject.h"
#include "swift/ABI/Metadata.h"
#include "swift/ABI/MetadataValues.h"
#include "swift/Runtime/Config.h"
#include "swift/Basic/STLExtras.h"
Expand All @@ -29,6 +30,8 @@ class AsyncTask;
class AsyncContext;
class Executor;
class Job;
struct OpaqueValue;
struct SwiftError;
class TaskStatusRecord;

/// An ExecutorRef isn't necessarily just a pointer to an executor
Expand Down Expand Up @@ -86,6 +89,13 @@ class AsyncFunctionPointer {

/// A schedulable job.
class alignas(2 * alignof(void*)) Job {
protected:
// Indices into SchedulerPrivate, for use by the runtime.
enum {
/// The next waiting task link, an AsyncTask that is waiting on a future.
NextWaitingTaskIndex = 0,
};

public:
// Reserved for the use of the scheduler.
void *SchedulerPrivate[2];
Expand Down Expand Up @@ -230,19 +240,142 @@ class AsyncTask : public HeapObject, public Job {
}
};

bool isFuture() const { return Flags.task_isFuture(); }

bool hasChildFragment() const { return Flags.task_isChildTask(); }
ChildFragment *childFragment() {
assert(hasChildFragment());
return reinterpret_cast<ChildFragment*>(this + 1);
}

// TODO: Future fragment
class FutureFragment {
public:
/// Describes the status of the future.
///
/// Futures always begin in the "Executing" state, and will always
/// make a single state change to either Success or Error.
enum class Status : uintptr_t {
/// The future is executing or ready to execute. The storage
/// is not accessible.
Executing = 0,

/// The future has completed with result (of type \c resultType).
Success,

/// The future has completed by throwing an error (an \c Error
/// existential).
Error,
};

/// An item within the wait queue, which includes the status and the
/// head of the list of tasks.
struct WaitQueueItem {
/// Mask used for the low status bits in a wait queue item.
static const uintptr_t statusMask = 0x03;

uintptr_t storage;

Status getStatus() const {
return static_cast<Status>(storage & statusMask);
}

AsyncTask *getTask() const {
return reinterpret_cast<AsyncTask *>(storage & ~statusMask);
}

static WaitQueueItem get(Status status, AsyncTask *task) {
return WaitQueueItem{
reinterpret_cast<uintptr_t>(task) | static_cast<uintptr_t>(status)};
}
};

private:
/// Queue containing all of the tasks that are waiting in `get()`.
///
/// The low bits contain the status, the rest of the pointer is the
/// AsyncTask.
std::atomic<WaitQueueItem> waitQueue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically I don't think that tasks need to coincide with any status but Executing, so the other statuses could be signal values rather than masked on. I don't think it really matters, though.


/// The type of the result that will be produced by the future.
const Metadata *resultType;

// Trailing storage for the result itself. The storage will be uninitialized,
// contain an instance of \c resultType, or contaon an an \c Error.

friend class AsyncTask;

public:
explicit FutureFragment(const Metadata *resultType)
: waitQueue(WaitQueueItem::get(Status::Executing, nullptr)),
resultType(resultType) { }

/// Destroy the storage associated with the future.
void destroy();

/// Retrieve a pointer to the storage of result.
OpaqueValue *getStoragePtr() {
return reinterpret_cast<OpaqueValue *>(
reinterpret_cast<char *>(this) + storageOffset(resultType));
}

/// Retrieve the error.
SwiftError *&getError() {
return *reinterpret_cast<SwiftError **>(
reinterpret_cast<char *>(this) + storageOffset(resultType));
}

/// Compute the offset of the storage from the base of the future
/// fragment.
static size_t storageOffset(const Metadata *resultType) {
size_t offset = sizeof(FutureFragment);
size_t alignment =
std::max(resultType->vw_alignment(), alignof(SwiftError *));
return (offset + alignment - 1) & ~(alignment - 1);
}

/// Determine the size of the future fragment given a particular future
/// result type.
static size_t fragmentSize(const Metadata *resultType) {
return storageOffset(resultType) +
std::max(resultType->vw_size(), sizeof(SwiftError *));
}
};

bool isFuture() const { return Flags.task_isFuture(); }

FutureFragment *futureFragment() {
assert(isFuture());
if (hasChildFragment()) {
return reinterpret_cast<FutureFragment *>(
reinterpret_cast<ChildFragment*>(this + 1) + 1);
}

return reinterpret_cast<FutureFragment *>(this + 1);
}

/// Wait for this future to complete.
///
/// \returns the status of the future. If this result is
/// \c Executing, then \c waitingTask has been added to the
/// wait queue and will be scheduled when the future completes. Otherwise,
/// the future has completed and can be queried.
FutureFragment::Status waitFuture(AsyncTask *waitingTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this more like pollFuture or peekFutureStatus/peekStatus?
wait to me implies waiting but we don't really do that, just check what we should do next

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the future is still executing, it puts waitingTask on the queue to be scheduled once the future has completed. We then get to yield the thread back to the scheduler to do more work (but don't have the way to do that yet)


/// Complete this future.
///
/// Upon completion, any waiting tasks will be scheduled on the given
/// executor.
void completeFuture(AsyncContext *context, ExecutorRef executor);

static bool classof(const Job *job) {
return job->isAsyncTask();
}

private:
/// Access the next waiting task, which establishes a singly linked list of
/// tasks that are waiting on a future.
AsyncTask *&getNextWaitingTask() {
return reinterpret_cast<AsyncTask *&>(
SchedulerPrivate[NextWaitingTaskIndex]);
}
};

// The compiler will eventually assume these.
Expand Down Expand Up @@ -327,6 +460,20 @@ class YieldingAsyncContext : public AsyncContext {
}
};

/// An asynchronous context within a task that describes a general "Future".
/// task.
///
/// This type matches the ABI of a function `<T> () async throws -> T`, which
/// is the type used by `Task.runDetached` and `Task.group.add` to create
/// futures.
class FutureAsyncContext : public AsyncContext {
public:
SwiftError *errorResult = nullptr;
OpaqueValue *indirectResult;

using AsyncContext::AsyncContext;
};

} // end namespace swift

#endif
53 changes: 53 additions & 0 deletions include/swift/Runtime/Concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,31 @@ AsyncTaskAndContext swift_task_create_f(JobFlags flags,
AsyncFunctionType<void()> *function,
size_t initialContextSize);

/// Create a task object with a future which will run the given
/// function.
///
/// The task is not yet scheduled.
///
/// If a parent task is provided, flags.task_hasChildFragment() must
/// be true, and this must be called synchronously with the parent.
/// The parent is responsible for creating a ChildTaskStatusRecord.
/// TODO: should we have a single runtime function for creating a task
/// and doing this child task status record management?
///
/// flags.task_isFuture must be set. \c futureResultType is the type
///
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
AsyncTaskAndContext swift_task_create_future(
JobFlags flags, AsyncTask *parent, const Metadata *futureResultType,
const AsyncFunctionPointer<void()> *function);

/// Create a task object with a future which will run the given
/// function.
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
AsyncTaskAndContext swift_task_create_future_f(
JobFlags flags, AsyncTask *parent, const Metadata *futureResultType,
AsyncFunctionType<void()> *function, size_t initialContextSize);

/// Allocate memory in a task.
///
/// This must be called synchronously with the task.
Expand Down Expand Up @@ -83,6 +108,34 @@ SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
JobPriority
swift_task_escalate(AsyncTask *task, JobPriority newPriority);

/// The result of waiting for a task future.
struct TaskFutureWaitResult {
enum Kind : uintptr_t {
/// The waiting task has been added to the future's wait queue, and will
/// be scheduled once the future has completed.
Waiting,

/// The future succeeded and produced a result value. \c storage points
/// at that value.
Success,

/// The future finished by throwing an error. \c storage is that error
/// existential.
Error,
};

Kind kind;
OpaqueValue *storage;
};

/// Wait for a future task to complete.
///
/// This can be called from any thread.
///
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
TaskFutureWaitResult
swift_task_future_wait(AsyncTask *task, AsyncTask *waitingTask);

/// Add a status record to a task. The record should not be
/// modified while it is registered with a task.
///
Expand Down
Loading