@@ -74,30 +74,29 @@ public struct Configuration: Sendable {
74
74
let pid = spawnResults. execution. processIdentifier
75
75
76
76
var spawnResultBox : SpawnResult ? ? = consume spawnResults
77
+ var _spawnResult = spawnResultBox!. take ( ) !
77
78
78
- return try await withAsyncTaskCleanupHandler {
79
- var _spawnResult = spawnResultBox!. take ( ) !
79
+ let processIdentifier = _spawnResult. execution. processIdentifier
80
+
81
+ let result = try await withAsyncTaskCleanupHandler {
80
82
let inputIO = _spawnResult. inputWriteEnd ( )
81
83
let outputIO = _spawnResult. outputReadEnd ( )
82
84
let errorIO = _spawnResult. errorReadEnd ( )
83
- let processIdentifier = _spawnResult. execution. processIdentifier
84
85
85
- async let terminationStatus = try monitorProcessTermination (
86
- forProcessWithIdentifier: processIdentifier
87
- )
88
86
// Body runs in the same isolation
89
- let result = try await body ( _spawnResult. execution, inputIO, outputIO, errorIO)
90
- return ExecutionResult (
91
- terminationStatus: try await terminationStatus,
92
- value: result
93
- )
87
+ return try await body ( _spawnResult. execution, inputIO, outputIO, errorIO)
94
88
} onCleanup: {
95
89
// Attempt to terminate the child process
96
90
await Execution . runTeardownSequence (
97
91
self . platformOptions. teardownSequence,
98
92
on: pid
99
93
)
100
94
}
95
+
96
+ return ExecutionResult (
97
+ terminationStatus: try await monitorProcessTermination ( forProcessWithIdentifier: processIdentifier) ,
98
+ value: result
99
+ )
101
100
}
102
101
}
103
102
@@ -752,11 +751,13 @@ extension Optional where Wrapped == String {
752
751
}
753
752
}
754
753
754
+ /// Runs `body`, and then runs `onCleanup` if body throws an error, or if the parent task is cancelled. In the latter case, `onCleanup` may be run concurrently with `body`. `body` is guaranteed to run exactly once. `onCleanup` is guaranteed to run only once, or not at all.
755
755
internal func withAsyncTaskCleanupHandler< Result> (
756
756
_ body: ( ) async throws -> Result ,
757
757
onCleanup handler: @Sendable @escaping ( ) async -> Void ,
758
758
isolation: isolated ( any Actor ) ? = #isolation
759
759
) async rethrows -> Result {
760
+ let ( runCancellationHandlerStream, runCancellationHandlerContinuation) = AsyncThrowingStream . makeStream ( of: Void . self)
760
761
return try await withThrowingTaskGroup (
761
762
of: Void . self,
762
763
returning: Result . self
@@ -767,15 +768,38 @@ internal func withAsyncTaskCleanupHandler<Result>(
767
768
// before the time ends. We then run the cancel handler.
768
769
do { while true { try await Task . sleep ( nanoseconds: 1_000_000_000 ) } } catch { }
769
770
// Run task cancel handler
770
- await handler ( )
771
+ runCancellationHandlerContinuation. finish ( throwing: CancellationError ( ) )
772
+ }
773
+
774
+ group. addTask {
775
+ // Enumerate the async stream until it completes or throws an error.
776
+ // Since we signal completion of the stream from cancellation or the
777
+ // parent task or the body throwing, this ensures that we run the
778
+ // cleanup handler exactly once in any failure scenario, and also do
779
+ // so _immediately_ if the failure scenario is due to parent task
780
+ // cancellation. We do so in a detached Task to prevent cancellation
781
+ // of the parent task from interrupting enumeration of the stream itself.
782
+ await Task . detached {
783
+ do {
784
+ var iterator = runCancellationHandlerStream. makeAsyncIterator ( )
785
+ while let _ = try await iterator. next ( ) {
786
+ }
787
+ } catch {
788
+ await handler ( )
789
+ }
790
+ } . value
791
+ }
792
+
793
+ defer {
794
+ group. cancelAll ( )
771
795
}
772
796
773
797
do {
774
798
let result = try await body ( )
775
- group . cancelAll ( )
799
+ runCancellationHandlerContinuation . finish ( )
776
800
return result
777
801
} catch {
778
- await handler ( )
802
+ runCancellationHandlerContinuation . finish ( throwing : error )
779
803
throw error
780
804
}
781
805
}
0 commit comments