From 44a095c8aa3ff14e731a80a67e4e89a1829b4a4f Mon Sep 17 00:00:00 2001 From: Alex Hoppen Date: Thu, 26 Sep 2024 21:51:28 -0700 Subject: [PATCH] Use an `AtomicInt32` to count `pendingUnitCount` instead of using `AsyncQueue` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adding an item to `AsyncQueue` is linear in the number of pending queue items, thus adding n items to an `AsyncQueue` before any can execute is in O(n^2). This decision was made intentionally because the primary use case for `AsyncQueue` was to track pending LSP requests, of which we don’t expect to have too many pending requests at any given time. `SourceKitIndexDelegate` was also using `AsyncQueue` to track the number of pending units to be processed and eg. after indexing SourceKit-LSP, I have seen this grow up to ~20,000. With the quadratic behavior, this explodes time-wise. Turns out that we don’t actually need to use a queue here at all, an atomic is sufficient and much faster. Independently, we should consider mitigating the quadratic behavior of `AsyncQueue` or `AsyncQueue` in general. Fixes #1541 rdar://130844901 --- Sources/CAtomics/include/CAtomics.h | 26 +++++++++++ Sources/SKSupport/Atomics.swift | 25 ++++++++++ .../SourceKitLSP/SourceKitIndexDelegate.swift | 46 +++++++------------ 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/Sources/CAtomics/include/CAtomics.h b/Sources/CAtomics/include/CAtomics.h index 8ec87f35f..fc659a311 100644 --- a/Sources/CAtomics/include/CAtomics.h +++ b/Sources/CAtomics/include/CAtomics.h @@ -44,4 +44,30 @@ static inline void atomic_uint32_destroy(CAtomicUInt32 *_Nonnull atomic) { free(atomic); } +typedef struct { + _Atomic(int32_t) value; +} CAtomicInt32; + +static inline CAtomicInt32 *_Nonnull atomic_int32_create(int32_t initialValue) { + CAtomicInt32 *atomic = malloc(sizeof(CAtomicInt32)); + atomic->value = initialValue; + return atomic; +} + +static inline int32_t atomic_int32_get(CAtomicInt32 *_Nonnull atomic) { + return atomic->value; +} + +static inline void atomic_int32_set(CAtomicInt32 *_Nonnull atomic, int32_t newValue) { + atomic->value = newValue; +} + +static inline int32_t atomic_int32_fetch_and_increment(CAtomicInt32 *_Nonnull atomic) { + return atomic->value++; +} + +static inline void atomic_int32_destroy(CAtomicInt32 *_Nonnull atomic) { + free(atomic); +} + #endif // SOURCEKITLSP_CATOMICS_H diff --git a/Sources/SKSupport/Atomics.swift b/Sources/SKSupport/Atomics.swift index 33479602f..0439ba776 100644 --- a/Sources/SKSupport/Atomics.swift +++ b/Sources/SKSupport/Atomics.swift @@ -82,3 +82,28 @@ public final class AtomicUInt32: Sendable { return atomic_uint32_fetch_and_increment(atomic) } } + +public final class AtomicInt32: Sendable { + private nonisolated(unsafe) let atomic: UnsafeMutablePointer + + public init(initialValue: Int32) { + self.atomic = atomic_int32_create(initialValue) + } + + public var value: Int32 { + get { + atomic_int32_get(atomic) + } + set { + atomic_int32_set(atomic, newValue) + } + } + + deinit { + atomic_int32_destroy(atomic) + } + + public func fetchAndIncrement() -> Int32 { + return atomic_int32_fetch_and_increment(atomic) + } +} diff --git a/Sources/SourceKitLSP/SourceKitIndexDelegate.swift b/Sources/SourceKitLSP/SourceKitIndexDelegate.swift index 1f64baec2..6a759f751 100644 --- a/Sources/SourceKitLSP/SourceKitIndexDelegate.swift +++ b/Sources/SourceKitLSP/SourceKitIndexDelegate.swift @@ -19,54 +19,43 @@ import SwiftExtensions /// `IndexDelegate` for the SourceKit workspace. actor SourceKitIndexDelegate: IndexDelegate { - - let queue = AsyncQueue() - /// Registered `MainFilesDelegate`s to notify when main files change. var mainFilesChangedCallbacks: [@Sendable () async -> Void] = [] /// The count of pending unit events. Whenever this transitions to 0, it represents a time where /// the index finished processing known events. Of course, that may have already changed by the /// time we are notified. - var pendingUnitCount: Int = 0 + let pendingUnitCount = AtomicInt32(initialValue: 0) public init() {} nonisolated public func processingAddedPending(_ count: Int) { - queue.async { - await self.addPending(count) - } - } - - private func addPending(_ count: Int) { - pendingUnitCount += count + pendingUnitCount.value += Int32(count) } nonisolated public func processingCompleted(_ count: Int) { - queue.async { - await self.processCompleted(count) - } - } - - private func processCompleted(_ count: Int) { - pendingUnitCount -= count - if pendingUnitCount == 0 { - indexChanged() + pendingUnitCount.value -= Int32(count) + if pendingUnitCount.value == 0 { + Task { + await indexChanged() + } } - if pendingUnitCount < 0 { - assertionFailure("pendingUnitCount = \(pendingUnitCount) < 0") - pendingUnitCount = 0 - indexChanged() + if pendingUnitCount.value < 0 { + // Technically this is not data race safe because `pendingUnitCount` might change between the check and us setting + // it to 0. But then, this should never happen anyway, so it's fine. + logger.fault("pendingUnitCount dropped below zero: \(self.pendingUnitCount.value)") + pendingUnitCount.value = 0 + Task { + await indexChanged() + } } } - private func indexChanged() { + private func indexChanged() async { logger.debug("IndexStoreDB changed") for callback in mainFilesChangedCallbacks { - queue.async { - await callback() - } + await callback() } } @@ -74,5 +63,4 @@ actor SourceKitIndexDelegate: IndexDelegate { public func addMainFileChangedCallback(_ callback: @escaping @Sendable () async -> Void) { mainFilesChangedCallbacks.append(callback) } - }