Skip to content

Commit 1aa0760

Browse files
committed
Add support for Task cancellation
1 parent fad6bf4 commit 1aa0760

File tree

7 files changed

+330
-101
lines changed

7 files changed

+330
-101
lines changed

AsyncQueue.podspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Pod::Spec.new do |s|
22
s.name = 'AsyncQueue'
3-
s.version = '0.7.0'
3+
s.version = '0.7.1'
44
s.license = 'MIT'
55
s.summary = 'A queue that enables ordered sending of events from synchronous to asynchronous code.'
66
s.homepage = 'https://github.com/dfed/swift-async-queue'

Sources/AsyncQueue/ActorQueue.swift

Lines changed: 77 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -60,29 +60,12 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
6060
let (taskStream, taskStreamContinuation) = AsyncStream<ActorTask>.makeStream()
6161
self.taskStreamContinuation = taskStreamContinuation
6262

63-
func beginExecuting(
64-
_ operation: sending @escaping (isolated ActorType) async -> Void,
65-
in context: isolated ActorType,
66-
priority: TaskPriority?
67-
) {
68-
// In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor.
69-
// Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution.
70-
Task(priority: priority) {
71-
await operation(context)
72-
}
73-
}
74-
7563
Task {
7664
// In an ideal world, we would isolate this `for await` loop to the `ActorType`.
7765
// However, there's no good way to do that without retaining the actor and creating a cycle.
7866
for await actorTask in taskStream {
7967
// Await switching to the ActorType context.
80-
await beginExecuting(
81-
actorTask.task,
82-
in: actorTask.executionContext,
83-
priority: actorTask.priority
84-
)
85-
await actorTask.sempahore.signal()
68+
await actorTask.task(actorTask.executionContext, actorTask.semaphore, actorTask.priority)
8669
}
8770
}
8871
}
@@ -104,34 +87,36 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
10487
weakExecutionContext = actor
10588
}
10689

107-
// MARK: Fileprivate
108-
109-
fileprivate let taskStreamContinuation: AsyncStream<ActorTask>.Continuation
110-
111-
/// The actor on whose isolated context our tasks run, force-unwrapped.
112-
/// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment.
113-
fileprivate var executionContext: ActorType {
114-
// Crashing here means that this queue is being sent tasks either before an execution context has been set, or
115-
// after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted
116-
// actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor.
117-
weakExecutionContext!
118-
}
90+
// MARK: Internal
11991

120-
fileprivate struct ActorTask: Sendable {
92+
struct ActorTask: Sendable {
12193
init(
12294
executionContext: ActorType,
12395
priority: TaskPriority?,
124-
task: @escaping @Sendable (isolated ActorType) async -> Void
96+
task: @escaping @Sendable (isolated ActorType, Semaphore, TaskPriority?) async -> Void
12597
) {
12698
self.executionContext = executionContext
12799
self.priority = priority
128100
self.task = task
129101
}
130-
102+
131103
let executionContext: ActorType
132-
let sempahore = Semaphore()
104+
let semaphore = Semaphore()
133105
let priority: TaskPriority?
134-
let task: @Sendable (isolated ActorType) async -> Void
106+
let task: @Sendable (isolated ActorType, Semaphore, TaskPriority?) async -> Void
107+
}
108+
109+
// MARK: Fileprivate
110+
111+
fileprivate let taskStreamContinuation: AsyncStream<ActorTask>.Continuation
112+
113+
/// The actor on whose isolated context our tasks run, force-unwrapped.
114+
/// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment.
115+
fileprivate var executionContext: ActorType {
116+
// Crashing here means that this queue is being sent tasks either before an execution context has been set, or
117+
// after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted
118+
// actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor.
119+
weakExecutionContext!
135120
}
136121

137122
// MARK: Private
@@ -180,14 +165,23 @@ extension Task {
180165
let task = ActorQueue<ActorType>.ActorTask(
181166
executionContext: actorQueue.executionContext,
182167
priority: priority,
183-
task: { executionContext in
184-
await delivery.sendValue(operation(executionContext))
168+
task: { executionContext, semaphore, priority in
169+
await semaphore.wait()
170+
delivery.execute({ @Sendable executionContext in
171+
await delivery.sendValue(operation(executionContext))
172+
}, in: executionContext, priority: priority)
185173
}
186174
)
175+
187176
actorQueue.taskStreamContinuation.yield(task)
188177
self.init(priority: priority) {
189-
await task.sempahore.wait()
190-
return await delivery.getValue()
178+
await withTaskCancellationHandler(
179+
operation: {
180+
await task.semaphore.signal()
181+
return await delivery.getValue()
182+
},
183+
onCancel: delivery.cancel
184+
)
191185
}
192186
}
193187

@@ -227,19 +221,27 @@ extension Task {
227221
let task = ActorQueue<ActorType>.ActorTask(
228222
executionContext: actorQueue.executionContext,
229223
priority: priority,
230-
task: { executionContext in
231-
do {
232-
try await delivery.sendValue(operation(executionContext))
233-
} catch {
234-
await delivery.sendFailure(error)
235-
}
224+
task: { executionContext, semaphore, priority in
225+
await semaphore.wait()
226+
delivery.execute({ @Sendable executionContext in
227+
do {
228+
try await delivery.sendValue(operation(executionContext))
229+
} catch {
230+
await delivery.sendFailure(error)
231+
}
232+
}, in: executionContext, priority: priority)
236233
}
237234
)
238235

239236
actorQueue.taskStreamContinuation.yield(task)
240237
self.init(priority: priority) {
241-
await task.sempahore.wait()
242-
return try await delivery.getValue()
238+
try await withTaskCancellationHandler(
239+
operation: {
240+
await task.semaphore.signal()
241+
return try await delivery.getValue()
242+
},
243+
onCancel: delivery.cancel
244+
)
243245
}
244246
}
245247

@@ -279,14 +281,22 @@ extension Task {
279281
let task = ActorQueue<MainActor>.ActorTask(
280282
executionContext: actorQueue.executionContext,
281283
priority: priority,
282-
task: { executionContext in
283-
await delivery.sendValue(operation())
284+
task: { executionContext, semaphore, priority in
285+
await semaphore.wait()
286+
delivery.execute({ @Sendable executionContext in
287+
await delivery.sendValue(operation())
288+
}, in: executionContext, priority: priority)
284289
}
285290
)
286291
actorQueue.taskStreamContinuation.yield(task)
287292
self.init(priority: priority) {
288-
await task.sempahore.wait()
289-
return await delivery.getValue()
293+
return await withTaskCancellationHandler(
294+
operation: {
295+
await task.semaphore.signal()
296+
return await delivery.getValue()
297+
},
298+
onCancel: delivery.cancel
299+
)
290300
}
291301
}
292302

@@ -326,19 +336,27 @@ extension Task {
326336
let task = ActorQueue<MainActor>.ActorTask(
327337
executionContext: actorQueue.executionContext,
328338
priority: priority,
329-
task: { executionContext in
330-
do {
331-
try await delivery.sendValue(operation())
332-
} catch {
333-
await delivery.sendFailure(error)
334-
}
339+
task: { executionContext, semaphore, priority in
340+
await semaphore.wait()
341+
delivery.execute({ @Sendable executionContext in
342+
do {
343+
try await delivery.sendValue(operation())
344+
} catch {
345+
await delivery.sendFailure(error)
346+
}
347+
}, in: executionContext, priority: priority)
335348
}
336349
)
337350

338351
actorQueue.taskStreamContinuation.yield(task)
339352
self.init(priority: priority) {
340-
await task.sempahore.wait()
341-
return try await delivery.getValue()
353+
try await withTaskCancellationHandler(
354+
operation: {
355+
await task.semaphore.signal()
356+
return try await delivery.getValue()
357+
},
358+
onCancel: delivery.cancel
359+
)
342360
}
343361
}
344362
}

Sources/AsyncQueue/FIFOQueue.swift

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ public final class FIFOQueue: Sendable {
3535

3636
Task.detached(priority: priority) {
3737
for await fifoTask in taskStream {
38-
await fifoTask.task()
39-
await fifoTask.sempahore.signal()
38+
await fifoTask.task(fifoTask.semaphore)
4039
}
4140
}
4241
}
@@ -48,12 +47,12 @@ public final class FIFOQueue: Sendable {
4847
// MARK: Fileprivate
4948

5049
fileprivate struct FIFOTask: Sendable {
51-
init(task: @escaping @Sendable () async -> Void) {
50+
init(task: @escaping @Sendable (Semaphore) async -> Void) {
5251
self.task = task
5352
}
5453

55-
let sempahore = Semaphore()
56-
let task: @Sendable () async -> Void
54+
let semaphore = Semaphore()
55+
let task: @Sendable (Semaphore) async -> Void
5756
}
5857

5958
fileprivate let taskStreamContinuation: AsyncStream<FIFOTask>.Continuation
@@ -91,13 +90,21 @@ extension Task {
9190
) where Failure == Never {
9291
let delivery = Delivery<Success, Failure>()
9392
let executeOnce = UnsafeClosureHolder(operation: operation)
94-
let task = FIFOQueue.FIFOTask {
95-
await delivery.sendValue(executeOnce.operation())
93+
let task = FIFOQueue.FIFOTask { semaphore in
94+
await semaphore.wait()
95+
await delivery.execute { @Sendable in
96+
await delivery.sendValue(executeOnce.operation())
97+
}.value
9698
}
9799
fifoQueue.taskStreamContinuation.yield(task)
98100
self.init {
99-
await task.sempahore.wait()
100-
return await delivery.getValue()
101+
await withTaskCancellationHandler(
102+
operation: {
103+
await task.semaphore.signal()
104+
return await delivery.getValue()
105+
},
106+
onCancel: delivery.cancel
107+
)
101108
}
102109
}
103110

@@ -127,22 +134,30 @@ extension Task {
127134
/// - operation: The operation to perform.
128135
@discardableResult
129136
public init(
130-
on actorQueue: FIFOQueue,
137+
on fifoQueue: FIFOQueue,
131138
@_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async throws -> Success
132139
) where Failure == any Error {
133140
let delivery = Delivery<Success, Failure>()
134141
let executeOnce = UnsafeThrowingClosureHolder(operation: operation)
135-
let task = FIFOQueue.FIFOTask {
136-
do {
137-
try await delivery.sendValue(executeOnce.operation())
138-
} catch {
139-
await delivery.sendFailure(error)
140-
}
142+
let task = FIFOQueue.FIFOTask { semaphore in
143+
await semaphore.wait()
144+
await delivery.execute { @Sendable in
145+
do {
146+
try await delivery.sendValue(executeOnce.operation())
147+
} catch {
148+
await delivery.sendFailure(error)
149+
}
150+
}.value
141151
}
142-
actorQueue.taskStreamContinuation.yield(task)
152+
fifoQueue.taskStreamContinuation.yield(task)
143153
self.init {
144-
await task.sempahore.wait()
145-
return try await delivery.getValue()
154+
try await withTaskCancellationHandler(
155+
operation: {
156+
await task.semaphore.signal()
157+
return try await delivery.getValue()
158+
},
159+
onCancel: delivery.cancel
160+
)
146161
}
147162
}
148163

@@ -179,13 +194,21 @@ extension Task {
179194
operation: @Sendable @escaping (isolated ActorType) async -> Success
180195
) where Failure == Never {
181196
let delivery = Delivery<Success, Failure>()
182-
let task = FIFOQueue.FIFOTask {
183-
await delivery.sendValue(operation(isolatedActor))
197+
let task = FIFOQueue.FIFOTask { semaphore in
198+
await semaphore.wait()
199+
await delivery.execute({ @Sendable isolatedActor in
200+
await delivery.sendValue(operation(isolatedActor))
201+
}, in: isolatedActor, priority: priority).value
184202
}
185203
fifoQueue.taskStreamContinuation.yield(task)
186204
self.init {
187-
await task.sempahore.wait()
188-
return await delivery.getValue()
205+
await withTaskCancellationHandler(
206+
operation: {
207+
await task.semaphore.signal()
208+
return await delivery.getValue()
209+
},
210+
onCancel: delivery.cancel
211+
)
189212
}
190213
}
191214

@@ -224,17 +247,25 @@ extension Task {
224247
operation: @Sendable @escaping (isolated ActorType) async throws -> Success
225248
) where Failure == any Error {
226249
let delivery = Delivery<Success, Failure>()
227-
let task = FIFOQueue.FIFOTask {
228-
do {
229-
try await delivery.sendValue(operation(isolatedActor))
230-
} catch {
231-
await delivery.sendFailure(error)
232-
}
250+
let task = FIFOQueue.FIFOTask { semaphore in
251+
await semaphore.wait()
252+
await delivery.execute({ @Sendable isolatedActor in
253+
do {
254+
try await delivery.sendValue(operation(isolatedActor))
255+
} catch {
256+
await delivery.sendFailure(error)
257+
}
258+
}, in: isolatedActor, priority: priority).value
233259
}
234260
fifoQueue.taskStreamContinuation.yield(task)
235261
self.init(priority: priority) {
236-
await task.sempahore.wait()
237-
return try await delivery.getValue()
262+
try await withTaskCancellationHandler(
263+
operation: {
264+
await task.semaphore.signal()
265+
return try await delivery.getValue()
266+
},
267+
onCancel: delivery.cancel
268+
)
238269
}
239270
}
240271
}

0 commit comments

Comments
 (0)