Skip to content

Commit 913d060

Browse files
committed
!watch Safer lifecycle watch API
1 parent bd2e4c0 commit 913d060

File tree

5 files changed

+75
-60
lines changed

5 files changed

+75
-60
lines changed

Sources/DistributedActors/Cluster/Reception/OperationLogDistributedReceptionist.swift

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
317317
if self.storage.addRegistration(sequenced: sequenced, key: key, guest: guest) {
318318
// self.instrumentation.actorRegistered(key: key, id: id) // TODO(distributed): make the instrumentation calls compatible with distributed actor based types
319319

320-
watchTermination(of: guest) { onActorTerminated(identity: $0) }
320+
watchTermination(of: guest)
321321

322322
self.log.debug(
323323
"Registered [\(id)] for key [\(key)]",
@@ -559,9 +559,7 @@ extension OpLogDistributedReceptionist {
559559
// We resolve a stub that we cannot really ever send messages to, but we can "watch" it
560560
let resolved = try! actorSystem._resolveStub(identity: identity) // TODO(distributed): remove the throwing here?
561561

562-
watchTermination(of: resolved) {
563-
onActorTerminated(identity: $0)
564-
}
562+
watchTermination(of: resolved)
565563
if self.storage.addRegistration(sequenced: sequenced, key: key, guest: resolved) {
566564
// self.instrumentation.actorRegistered(key: key, id: id) // TODO(distributed): make the instrumentation calls compatible with distributed actor based types
567565
}
@@ -795,8 +793,8 @@ extension OpLogDistributedReceptionist {
795793
// MARK: Termination handling
796794

797795
extension OpLogDistributedReceptionist {
798-
// func onActorTerminated(terminated: Signals.Terminated) {
799-
func onActorTerminated(identity id: ID) {
796+
797+
public distributed func terminated(actor id: ID) {
800798
if id == ActorID._receptionist(on: id.uniqueNode, for: .distributedActors) {
801799
self.log.debug("Watched receptionist terminated: \(id)")
802800
self.receptionistTerminated(identity: id)

Sources/DistributedActors/DistributedActors.docc/Lifecycle.md

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ distributed actor Romeo: LifecycleWatch {
2424
}
2525

2626
distributed func watch(_ romeo: Romeo) {
27-
watchTermination(of: romeo) { terminatedID in
28-
probe.tell("Oh no! \(terminatedID) is dead!")
29-
// TODO: Drink poison
30-
}
27+
watchTermination(of: romeo)
28+
}
29+
30+
distributed func terminated(actor id: ActorID) async {
31+
print("Oh no! \(id) is dead!")
32+
// *Drinks poison*
3133
}
3234
}
3335

@@ -37,20 +39,24 @@ distributed actor Juliet: LifecycleWatch {
3739
}
3840

3941
distributed func watch(_ romeo: Romeo) {
40-
watchTermination(of: romeo) { terminatedID in
41-
probe.tell("Oh no! \(terminatedID) is dead!")
42-
// TODO: Stab through heart
43-
}
42+
watchTermination(of: romeo)
43+
}
44+
45+
distributed func terminated(actor id: ActorID) async {
46+
print("Oh no! \(id) is dead!")
47+
// *Stabs through heart*
4448
}
4549
}
4650
```
4751

48-
The ``LifecycleWatch/watchTermination(of:whenTerminated:file:line:)`` API purposefully does not use async/await because that would cause `romeo` to be retained as this function suspends. Instead, we allow it to complete and once the romeo actor is determined terminated, we get called back with its ``ActorID``.
52+
The ``LifecycleWatch/watchTermination(of:file:line:)`` API purposefully does not use async/await because that would cause `romeo` to be retained as this function suspends. Instead, we allow it, and the function calling it (which keeps a reference to `Romeo`), to complete and once the romeo actor is determined terminated, we get called back with its ``ActorID`` in the separate ``terminated(actor:file:line:)`` method.
4953

5054
This API offers the same semantics, regardless where the actors are located, and always triggers the termination closure as the watched actor is considered to have terminated.
5155

5256
In case the watched actor is _local_, it's termination is tied to Swift's ref-counting mechanisms, and an actor is terminated as soon as there are no more strong references to it in a system. It then is deinitialized, and the actor system's `resignID(actor.id)` is triggered, causing propagation to all the other actors which have been watching that actor.
5357

58+
You can also ``unwatchTermination(of:file:line:)``
59+
5460
In case the watched actor is _remote_, termination may happen because of two reasons:
5561
- either its reference count _on the remote system_ dropped to zero and it followed the same deinitialization steps as just described in the local case;
5662
- or, the entire node the distributed actor was located on has been declared ``Cluster/MemberStatus/down`` and therefore the actor is assumed terminated (regardless if it really has deinitialized or not).

Sources/DistributedActors/LifecycleMonitoring/LifecycleWatch.swift

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,21 @@ import NIO
2020
///
2121
/// - SeeAlso:
2222
/// - <doc:Lifecycle>
23-
public protocol LifecycleWatch: DistributedActor where ActorSystem == ClusterSystem {}
23+
public protocol LifecycleWatch: DistributedActor where ActorSystem == ClusterSystem {
24+
25+
/// Called with an ``ActorID`` of a distributed actor that was previously watched using ``watchTermination(of:file:line:)``.
26+
// distributed // TODO(distributed): if we allowed non-distributed funcs in DA constrained protocol, we can allow them to be witnessed
27+
func terminated(actor id: ActorID) async throws
28+
29+
}
2430

2531
// ==== ----------------------------------------------------------------------------------------------------------------
2632
// MARK: Lifecycle Watch API
2733

2834
extension LifecycleWatch {
35+
2936
/// Watch the `watchee` actor for termination, and trigger the `whenTerminated` callback when
30-
@discardableResult
37+
@available(*, deprecated, message: "Replaced with the much safer `watchTermination(of:)` paired with `actorTerminated(_:)`")
3138
public func watchTermination<Watchee>(
3239
of watchee: Watchee,
3340
@_inheritActorContext @_implicitSelfCapture whenTerminated: @escaping @Sendable (ID) async -> Void,
@@ -42,6 +49,22 @@ extension LifecycleWatch {
4249
return watchee
4350
}
4451

52+
/// Watch the `watchee` actor for termination, and trigger the `whenTerminated` callback when
53+
@discardableResult
54+
public func watchTermination<Watchee>(
55+
of watchee: Watchee,
56+
file: String = #file, line: UInt = #line
57+
) -> Watchee where Watchee: DistributedActor, Watchee.ActorSystem == ClusterSystem {
58+
// TODO(distributed): reimplement this as self.id as? _ActorContext which will have the watch things.
59+
guard let watch = self.actorSystem._getLifecycleWatch(watcher: self) else {
60+
return watchee
61+
}
62+
63+
let wacheeID = watchee.id
64+
watch.termination(of: watchee, whenTerminated: { id in await try? self.terminated(actor: wacheeID) }, file: file, line: line)
65+
return watchee
66+
}
67+
4568
/// Reverts the watching of an previously watched actor.
4669
///
4770
/// Unwatching a not-previously-watched actor has no effect.
@@ -75,10 +98,19 @@ extension LifecycleWatch {
7598
/// - MUST NOT be invoked concurrently to the actors execution, i.e. from the "outside" of the current actor.
7699
///
77100
/// - Returns: the passed in watchee reference for easy chaining `e.g. return context.unwatch(ref)`
101+
@available(*, deprecated, renamed: "unwatchTermination(of:file:line:)")
78102
@discardableResult
79103
public func unwatch<Watchee: DistributedActor>(
80104
_ watchee: Watchee,
81105
file: String = #file, line: UInt = #line
106+
) -> Watchee where Watchee.ActorSystem == ClusterSystem {
107+
self.unwatchTermination(of: watchee, file: file, line: line)
108+
}
109+
110+
@discardableResult
111+
public func unwatchTermination<Watchee: DistributedActor>(
112+
of watchee: Watchee,
113+
file: String = #file, line: UInt = #line
82114
) -> Watchee where Watchee.ActorSystem == ClusterSystem {
83115
// TODO(distributed): reimplement this as self.id as? _ActorContext which will have the watch things.
84116
guard let watch = self.actorSystem._getLifecycleWatch(watcher: self) else {

Sources/DistributedActors/Pattern/WorkerPool.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@ public distributed actor WorkerPool<Worker: DistributedWorker>: DistributedWorke
9999
continuation.resume()
100100
self.newWorkerContinuations.remove(at: i)
101101
}
102-
watchTermination(of: worker) { self.onWorkerTerminated(id: $0) }
102+
watchTermination(of: worker)
103103
}
104104
}
105105
case .static(let workers):
106106
workers.forEach { worker in
107107
self.workers[worker.id] = Weak(worker)
108-
watchTermination(of: worker) { self.onWorkerTerminated(id: $0) }
108+
watchTermination(of: worker)
109109
}
110110
}
111111
}
@@ -133,7 +133,7 @@ public distributed actor WorkerPool<Worker: DistributedWorker>: DistributedWorke
133133
switch (self.hasTerminatedWorkers, self.whenAllWorkersTerminated) {
134134
case (false, _), (true, .awaitNewWorkers):
135135
self.actorSystem.log.log(level: self.logLevel, "Worker pool is empty, waiting for new worker.")
136-
try await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
136+
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
137137
self.newWorkerContinuations.append(continuation)
138138
}
139139
case (true, .throw(let error)):
@@ -146,7 +146,7 @@ public distributed actor WorkerPool<Worker: DistributedWorker>: DistributedWorke
146146
return worker
147147
} else {
148148
// Worker terminated; clean up and try again
149-
self.onWorkerTerminated(id: selectedWorkerID)
149+
self.terminated(actor: selectedWorkerID)
150150
return try await self.selectWorker()
151151
}
152152
}
@@ -160,7 +160,7 @@ public distributed actor WorkerPool<Worker: DistributedWorker>: DistributedWorke
160160
return selected
161161
}
162162

163-
private func onWorkerTerminated(id: Worker.ID) {
163+
public distributed func terminated(actor id: Worker.ID) {
164164
self.workers.removeValue(forKey: id)
165165
self.hasTerminatedWorkers = true
166166
self.roundRobinPos = 0

Tests/DistributedActorsTests/LifecycleWatchTests.swift

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ distributed actor Romeo: LifecycleWatch, CustomStringConvertible {
4242
// nothing important here
4343
}
4444

45+
public distributed func terminated(actor id: ActorID) async throws {
46+
// ignore
47+
}
48+
4549
nonisolated var description: String {
4650
"\(Self.self)(\(id))"
4751
}
@@ -63,30 +67,24 @@ distributed actor Juliet: LifecycleWatch, CustomStringConvertible {
6367
_ romeo: Romeo,
6468
unwatch doUnwatch: Bool
6569
) async throws {
66-
watchTermination(of: romeo) { terminatedIdentity in
67-
probe.tell("Received terminated: \(terminatedIdentity)")
68-
}
70+
watchTermination(of: romeo)
6971
if doUnwatch {
70-
unwatch(romeo)
72+
unwatchTermination(of: romeo)
7173
}
7274
}
7375

74-
distributed func meetWatchAsyncCallback(
75-
_ romeo: Romeo,
76-
unwatch doUnwatch: Bool
77-
) async throws {
78-
@Sendable
79-
func asyncTerminated(_ terminatedIdentity: ClusterSystem.ActorID) async {
80-
await self.probe.tell("Received terminated: \(terminatedIdentity)")
81-
}
82-
83-
watchTermination(of: romeo, whenTerminated: asyncTerminated)
84-
85-
if doUnwatch {
86-
unwatch(romeo)
87-
}
76+
// public nonisolated func terminated(actor id: ActorID) async {
77+
// await self.whenLocal { __secretlyKnownToBeLocal in
78+
// await __secretlyKnownToBeLocal.probe.tell("Received terminated: \(id)")
79+
// }
80+
// }
81+
82+
83+
public distributed func terminated(actor id: ActorID) async { // not REALLY distributed...
84+
self.probe.tell("Received terminated: \(id)")
8885
}
89-
86+
87+
9088
nonisolated var description: String {
9189
"\(Self.self)(\(id))"
9290
}
@@ -115,25 +113,6 @@ final class LifecycleWatchTests: ClusterSystemXCTestCase, @unchecked Sendable {
115113
try pj.expectMessage("Received terminated: /user/Romeo-b")
116114
}
117115

118-
func test_watch_shouldTriggerTerminatedWhenWatchedActorDeinits_async() async throws {
119-
let pj = self.testKit.makeTestProbe(expecting: String.self)
120-
let pr = self.testKit.makeTestProbe(expecting: String.self)
121-
let juliet = Juliet(probe: pj, actorSystem: system)
122-
123-
func meet() async throws {
124-
var romeo: Romeo? = Romeo(probe: pr, actorSystem: system)
125-
126-
try await juliet.meetWatchAsyncCallback(romeo!, unwatch: false)
127-
romeo = nil
128-
}
129-
try await meet()
130-
131-
try pj.expectMessage("Juliet init")
132-
try pr.expectMessage("Romeo init")
133-
try pr.expectMessage("Romeo deinit")
134-
try pj.expectMessage("Received terminated: /user/Romeo-b")
135-
}
136-
137116
func test_watchThenUnwatch_shouldTriggerTerminatedWhenWatchedActorDeinits() async throws {
138117
let pj = self.testKit.makeTestProbe(expecting: String.self)
139118
let pr = self.testKit.makeTestProbe(expecting: String.self)

0 commit comments

Comments
 (0)