Skip to content

Commit 0b024da

Browse files
committed
workaround fragile closures with generic actors /and inheriting context
1 parent b842358 commit 0b024da

File tree

3 files changed

+28
-27
lines changed

3 files changed

+28
-27
lines changed

Sources/DistributedActors/LifecycleMonitoring/LifecycleWatch.swift

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ extension LifecycleWatch {
3939
@available(*, deprecated, message: "Replaced with the much safer `watchTermination(of:)` paired with `actorTerminated(_:)`")
4040
public func watchTermination<Watchee>(
4141
of watchee: Watchee,
42-
@_inheritActorContext @_implicitSelfCapture whenTerminated: @escaping @Sendable (ID) async -> Void,
42+
@_inheritActorContext whenTerminated: @escaping @Sendable (ID) async -> Void,
4343
file: String = #file, line: UInt = #line
4444
) -> Watchee where Watchee: DistributedActor, Watchee.ActorSystem == ClusterSystem {
4545
// TODO(distributed): reimplement this as self.id as? _ActorContext which will have the watch things.
4646
guard let watch = self.actorSystem._getLifecycleWatch(watcher: self) else {
4747
return watchee
4848
}
4949

50-
watch.termination(of: watchee, whenTerminated: whenTerminated, file: file, line: line)
50+
watch.termination(of: watchee.id, whenTerminated: whenTerminated, file: file, line: line)
5151
return watchee
5252
}
5353

@@ -66,17 +66,16 @@ extension LifecycleWatch {
6666
of watchee: Watchee,
6767
file: String = #file, line: UInt = #line
6868
) -> Watchee where Watchee: DistributedActor, Watchee.ActorSystem == ClusterSystem {
69-
// // TODO(distributed): reimplement this as self.id as? _ActorContext which will have the watch things.
70-
// guard let watch = self.actorSystem._getLifecycleWatch(watcher: self) else {
71-
// return watchee
72-
// }
73-
//
74-
// watch.termination(of: watchee, whenTerminated: { id in
75-
// try? await self.terminated(actor: id)
76-
// }, file: file, line: line)
77-
//
78-
// return watchee
79-
fatalError("X")
69+
// TODO(distributed): reimplement this as self.id as? _ActorContext which will have the watch things.
70+
guard let watch = self.actorSystem._getLifecycleWatch(watcher: self) else {
71+
return watchee
72+
}
73+
74+
watch.termination(of: watchee.id, whenTerminated: { id in
75+
try? await self.terminated(actor: id)
76+
}, file: file, line: line)
77+
78+
return watchee
8079
}
8180

8281
/// Reverts the watching of an previously watched actor.
@@ -212,18 +211,14 @@ public final class LifecycleWatchContainer {
212211

213212
extension LifecycleWatchContainer {
214213
/// Performed by the sending side of "watch", therefore the `watcher` should equal `context.myself`
215-
public func termination<Watchee>(
216-
of watchee: Watchee,
217-
@_inheritActorContext @_implicitSelfCapture whenTerminated: @escaping @Sendable (ClusterSystem.ActorID) async -> Void,
214+
public func termination(
215+
of watcheeID: ActorID,
216+
@_implicitSelfCapture whenTerminated: @escaping @Sendable (ClusterSystem.ActorID) async -> Void,
218217
file: String = #file, line: UInt = #line
219-
) where Watchee: DistributedActor, Watchee.ActorSystem == ClusterSystem {
220-
traceLog_DeathWatch("issue watch: \(watchee) (from \(self.watcherID))")
218+
) {
219+
traceLog_DeathWatch("issue watch: \(watcheeID) (from \(self.watcherID))")
221220

222221
let watcherID: ActorID = self.watcherID
223-
let watcheeID: ActorID = watchee.id
224-
// guard let watcherID = myself?.id else {
225-
// fatalError("Cannot watch from actor \(optional: self.myself), it is not managed by the cluster. Identity: \(watchee.id)")
226-
// }
227222

228223
// watching ourselves is a no-op, since we would never be able to observe the Terminated message anyway:
229224
guard watcheeID != watcherID else {
@@ -233,14 +228,14 @@ extension LifecycleWatchContainer {
233228
let addressableWatchee = self.system._resolveUntyped(context: .init(id: watcheeID, system: self.system))
234229
let addressableWatcher = self.system._resolveUntyped(context: .init(id: watcherID, system: self.system))
235230

236-
if self.isWatching(watchee.id) {
231+
if self.isWatching(watcheeID) {
237232
// While we bail out early here, we DO override whichever value was set as the customized termination message.
238233
// This is to enable being able to keep updating the context associated with a watched actor, e.g. if how
239234
// we should react to its termination has changed since the last time watch() was invoked.
240-
self.watching[watchee.id] = whenTerminated
235+
self.watching[watcheeID] = whenTerminated
241236
} else {
242237
// not yet watching, so let's add it:
243-
self.watching[watchee.id] = whenTerminated
238+
self.watching[watcheeID] = whenTerminated
244239

245240
addressableWatchee._sendSystemMessage(.watch(watchee: addressableWatchee, watcher: addressableWatcher), file: file, line: line)
246241
self.subscribeNodeTerminatedEvents(watchedID: watcheeID, file: file, line: line)

Tests/DistributedActorsTests/DistributedReceptionistTests.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,17 @@ private distributed actor Boss: LifecycleWatch {
5757

5858
self.listingTask = Task {
5959
for await worker in await self.actorSystem.receptionist.listing(of: .workers) {
60-
self.workers.insert(worker.id)
60+
self.workers.insert(watchTermination(of: worker).id)
6161
self.probe?.tell("\(self.id) \(self.name) found \(worker.id)")
6262
}
6363
}
6464
}
65+
66+
// FIXME(distributed): should not need to be distributed
67+
distributed func terminated(actor id: ActorID) async throws {
68+
self.workers.remove(id)
69+
}
70+
6571

6672
distributed func done() {
6773
self.listingTask?.cancel()

Tests/DistributedActorsTests/LifecycleWatchTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ final class LifecycleWatchTests: ClusterSystemXCTestCase, @unchecked Sendable {
142142
let (first, second) = await self.setUpPair() { settings in
143143
settings.enabled = true
144144
}
145-
try joinNodes(node: first, with: second, ensureMembers: .up)
145+
try await joinNodes(node: first, with: second, ensureMembers: .up)
146146

147147
let juliet = Juliet(probe: pj, actorSystem: first)
148148

0 commit comments

Comments
 (0)