Skip to content

Commit 7faeeb8

Browse files
committed
Process termination monitoring implementation on Linux conflicts with processes spawned by other means
The approach used by Subprocess to monitor subprocess termination on Linux is fundamentally flawed as it calls waitid with P_ALL, and WEXITED without WNOWAIT, which will end up reaping pids that were spawned outside the Subprocess library. Use an implementation more like swift-testing does for wait tests, which doesn't suffer from this issue. Closes #82
1 parent 4211d5f commit 7faeeb8

File tree

1 file changed

+98
-70
lines changed

1 file changed

+98
-70
lines changed

Sources/Subprocess/Platforms/Subprocess+Linux.swift

Lines changed: 98 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ extension Configuration {
3737
outputPipe: consuming CreatedPipe,
3838
errorPipe: consuming CreatedPipe
3939
) throws -> SpawnResult {
40+
// Ensure the waiter thread is running.
4041
_setupMonitorSignalHandler()
4142

4243
// Instead of checking if every possible executable path
@@ -266,32 +267,53 @@ extension String {
266267
internal func monitorProcessTermination(
267268
forProcessWithIdentifier pid: ProcessIdentifier
268269
) async throws -> TerminationStatus {
269-
return try await withCheckedThrowingContinuation { continuation in
270+
try await withCheckedThrowingContinuation { continuation in
270271
_childProcessContinuations.withLock { continuations in
271-
if let existing = continuations.removeValue(forKey: pid.value),
272-
case .status(let existingStatus) = existing
273-
{
274-
// We already have existing status to report
275-
continuation.resume(returning: existingStatus)
276-
} else {
277-
// Save the continuation for handler
278-
continuations[pid.value] = .continuation(continuation)
279-
}
272+
// We don't need to worry about a race condition here because waitid()
273+
// does not clear the wait/zombie state of the child process. If it sees
274+
// the child process has terminated and manages to acquire the lock before
275+
// we add this continuation to the dictionary, then it will simply loop
276+
// and report the status again.
277+
let oldContinuation = continuations.updateValue(continuation, forKey: pid.value)
278+
precondition(oldContinuation == nil)
279+
280+
// Wake up the waiter thread if it is waiting for more child processes.
281+
_ = pthread_cond_signal(_waitThreadNoChildrenCondition)
280282
}
281283
}
282284
}
283285

284-
private enum ContinuationOrStatus {
285-
case continuation(CheckedContinuation<TerminationStatus, any Error>)
286-
case status(TerminationStatus)
286+
// Small helper to provide thread-safe access to the child process to continuations map as well as a condition variable to suspend the calling thread when there are no subprocesses to wait for. Note that Mutex cannot be used here because we need the semantics of pthread_cond_wait, which requires passing the pthread_mutex_t instance as a parameter, something the Mutex API does not provide access to.
287+
private final class ChildProcessContinuations: Sendable {
288+
private nonisolated(unsafe) var continuations = [pid_t: CheckedContinuation<TerminationStatus, any Error>]()
289+
private nonisolated(unsafe) let mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
290+
291+
init() {
292+
pthread_mutex_init(mutex, nil)
293+
}
294+
295+
func withLock<R>(_ body: (inout [pid_t: CheckedContinuation<TerminationStatus, any Error>]) throws -> R) rethrows -> R {
296+
try withUnsafeUnderlyingLock { _, continuations in
297+
try body(&continuations)
298+
}
299+
}
300+
301+
func withUnsafeUnderlyingLock<R>(_ body: (UnsafeMutablePointer<pthread_mutex_t>, inout [pid_t: CheckedContinuation<TerminationStatus, any Error>]) throws -> R) rethrows -> R {
302+
pthread_mutex_lock(mutex)
303+
defer {
304+
pthread_mutex_unlock(mutex)
305+
}
306+
return try body(mutex, &continuations)
307+
}
287308
}
288309

289-
private let _childProcessContinuations:
290-
Mutex<
291-
[pid_t: ContinuationOrStatus]
292-
> = Mutex([:])
310+
private let _childProcessContinuations = ChildProcessContinuations()
293311

294-
private let signalSource: SendableSourceSignal = SendableSourceSignal()
312+
private nonisolated(unsafe) let _waitThreadNoChildrenCondition = {
313+
let result = UnsafeMutablePointer<pthread_cond_t>.allocate(capacity: 1)
314+
_ = pthread_cond_init(result, nil)
315+
return result
316+
}()
295317

296318
private extension siginfo_t {
297319
var si_status: Int32 {
@@ -316,64 +338,70 @@ private extension siginfo_t {
316338
}
317339

318340
private let setup: () = {
319-
signalSource.setEventHandler {
320-
while true {
321-
var siginfo = siginfo_t()
322-
guard waitid(P_ALL, id_t(0), &siginfo, WEXITED) == 0 || errno == EINTR else {
323-
return
324-
}
325-
var status: TerminationStatus? = nil
326-
switch siginfo.si_code {
327-
case .init(CLD_EXITED):
328-
status = .exited(siginfo.si_status)
329-
case .init(CLD_KILLED), .init(CLD_DUMPED):
330-
status = .unhandledException(siginfo.si_status)
331-
case .init(CLD_TRAPPED), .init(CLD_STOPPED), .init(CLD_CONTINUED):
332-
// Ignore these signals because they are not related to
333-
// process exiting
334-
break
335-
default:
336-
fatalError("Unexpected exit status: \(siginfo.si_code)")
337-
}
338-
if let status = status {
339-
_childProcessContinuations.withLock { continuations in
341+
// Create the thread. It will run immediately; because it runs in an infinite
342+
// loop, we aren't worried about detaching or joining it.
343+
var thread = pthread_t()
344+
_ = pthread_create(
345+
&thread,
346+
nil,
347+
{ _ -> UnsafeMutableRawPointer? in
348+
// Run an infinite loop that waits for child processes to terminate and
349+
// captures their exit statuses.
350+
while true {
351+
// Listen for child process exit events. WNOWAIT means we don't perturb the
352+
// state of a terminated (zombie) child process, allowing us to fetch the
353+
// continuation (if available) before reaping.
354+
var siginfo = siginfo_t()
355+
errno = 0
356+
if waitid(P_ALL, id_t(0), &siginfo, WEXITED | WNOWAIT) == 0 {
340357
let pid = siginfo.si_pid
341-
if let existing = continuations.removeValue(forKey: pid),
342-
case .continuation(let c) = existing
343-
{
344-
c.resume(returning: status)
345-
} else {
346-
// We don't have continuation yet, just state status
347-
continuations[pid] = .status(status)
358+
359+
// If we had a continuation for this PID, allow the process to be reaped
360+
// and pass the resulting exit condition back to the calling task. If
361+
// there is no continuation, then either it hasn't been stored yet or
362+
// this child process is not tracked by the waiter thread.
363+
guard pid != 0, let c = _childProcessContinuations.withLock({ $0.removeValue(forKey: pid) }) else {
364+
continue
365+
}
366+
367+
c.resume(with: Result {
368+
// Here waitid should not block because `pid` has already terminated at this point.
369+
while true {
370+
var siginfo = siginfo_t()
371+
errno = 0
372+
if waitid(P_PID, numericCast(pid), &siginfo, WEXITED) == 0 {
373+
var status: TerminationStatus? = nil
374+
switch siginfo.si_code {
375+
case .init(CLD_EXITED):
376+
return .exited(siginfo.si_status)
377+
case .init(CLD_KILLED), .init(CLD_DUMPED):
378+
return .unhandledException(siginfo.si_status)
379+
default:
380+
fatalError("Unexpected exit status: \(siginfo.si_code)")
381+
}
382+
} else if errno != EINTR {
383+
throw SubprocessError.UnderlyingError(rawValue: errno)
384+
}
385+
}
386+
})
387+
} else if errno == ECHILD {
388+
// We got ECHILD. If there are no continuations added right now, we should
389+
// suspend this thread on the no-children condition until it's awoken by a
390+
// newly-scheduled waiter process. (If this condition is spuriously
391+
// woken, we'll just loop again, which is fine.) Note that we read errno
392+
// outside the lock in case acquiring the lock perturbs it.
393+
_childProcessContinuations.withUnsafeUnderlyingLock { lock, childProcessContinuations in
394+
if childProcessContinuations.isEmpty {
395+
_ = pthread_cond_wait(_waitThreadNoChildrenCondition, lock)
396+
}
348397
}
349398
}
350399
}
351-
}
352-
}
353-
signalSource.resume()
400+
},
401+
nil
402+
)
354403
}()
355404

356-
/// Unchecked Sendable here since this class is only explicitly
357-
/// initialized once during the lifetime of the process
358-
final class SendableSourceSignal: @unchecked Sendable {
359-
private let signalSource: DispatchSourceSignal
360-
361-
func setEventHandler(handler: @escaping DispatchSourceHandler) {
362-
self.signalSource.setEventHandler(handler: handler)
363-
}
364-
365-
func resume() {
366-
self.signalSource.resume()
367-
}
368-
369-
init() {
370-
self.signalSource = DispatchSource.makeSignalSource(
371-
signal: SIGCHLD,
372-
queue: .global()
373-
)
374-
}
375-
}
376-
377405
private func _setupMonitorSignalHandler() {
378406
// Only executed once
379407
setup

0 commit comments

Comments
 (0)