Skip to content

Commit e2f3041

Browse files
committed
+testkit improve testkit for lifecycle tests
1 parent f04f2ee commit e2f3041

File tree

8 files changed

+222
-76
lines changed

8 files changed

+222
-76
lines changed

Sources/DistributedActors/ActorLogging.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ internal final class LoggingContext {
7676
extension Logger {
7777
/// Create a logger specific to this actor.
7878
public init<Act: DistributedActor>(actor: Act) where Act.ActorSystem == ClusterSystem {
79-
var log = Logger(label: "\(actor.id)")
79+
var log = actor.actorSystem.settings.logging.baseLogger
8080
log[metadataKey: "actor/path"] = "\(actor.id.path)"
8181
log[metadataKey: "actor/id"] = "\(actor.id)"
8282
self = log

Sources/DistributedActors/Cluster/DistributedNodeDeathWatcher.swift

Lines changed: 132 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,141 @@
1515
import Distributed
1616
import Logging
1717

18+
/// Implements ``LifecycleWatch`` semantics in presence of ``Node`` failures.
19+
///
20+
/// Depends on a failure detector (e.g. SWIM) to actually detect a node failure, however once detected,
21+
/// it handles notifying all _local_ actors which have watched at least one actor the terminating node.
22+
///
23+
/// ### Implementation
24+
/// In order to avoid every actor having to subscribe to cluster events and individually handle the relationship between those
25+
/// and individually watched actors, the watcher handles subscribing for cluster events on behalf of actors which watch
26+
/// other actors on remote nodes, and messages them upon a node becoming down.
27+
///
28+
/// Actor which is notified automatically when a remote actor is `context.watch()`-ed.
29+
///
30+
/// Allows manually mocking membership changes to trigger terminated notifications.
1831
internal actor DistributedNodeDeathWatcher {
32+
// TODO(distributed): actually use this actor rather than the behavior
33+
1934
typealias ActorSystem = ClusterSystem
20-
21-
var membership: Cluster.Membership = .empty
22-
let log: Logger
23-
24-
init(actorSystem: ActorSystem) {
35+
36+
private let log: Logger
37+
38+
private let selfNode: UniqueNode
39+
private var membership: Cluster.Membership = .empty
40+
41+
/// Members which have been `removed`
42+
// TODO: clear after a few days, or some max count of nodes, use sorted set for this
43+
private var nodeTombstones: Set<UniqueNode> = []
44+
45+
/// Mapping between remote node, and actors which have watched some actors on given remote node.
46+
private var remoteWatchCallbacks: [UniqueNode: Set<WatcherAndCallback>] = [:]
47+
48+
private var eventListenerTask: Task<Void, Error>?
49+
50+
init(actorSystem: ActorSystem) async {
2551
var log = actorSystem.log
26-
// log.metadata["path"] = "CLU"
2752
self.log = log
53+
self.selfNode = actorSystem.cluster.uniqueNode
54+
// initialized
55+
56+
let events = actorSystem.cluster.events
57+
self.eventListenerTask = Task {
58+
for try await event in events {
59+
switch event {
60+
case .membershipChange(let change):
61+
self.membershipChanged(change)
62+
case .snapshot(let membership):
63+
let diff = Cluster.Membership._diff(from: .empty, to: membership)
64+
for change in diff.changes {
65+
self.membershipChanged(change)
66+
}
67+
case .leadershipChange, .reachabilityChange:
68+
break // ignore those, they don't affect downing
69+
}
70+
}
71+
}
72+
}
73+
74+
func watchActor(
75+
on remoteNode: UniqueNode,
76+
by watcher: ClusterSystem.ActorID,
77+
whenTerminated nodeTerminatedFn: @escaping @Sendable (UniqueNode) async -> Void
78+
) {
79+
guard !self.nodeTombstones.contains(remoteNode) else {
80+
// the system the watcher is attempting to watch has terminated before the watch has been processed,
81+
// thus we have to immediately reply with a termination system message, as otherwise it would never receive one
82+
Task {
83+
await nodeTerminatedFn(remoteNode)
84+
}
85+
return
86+
}
87+
88+
let record = WatcherAndCallback(watcherID: watcher, callback: nodeTerminatedFn)
89+
self.remoteWatchCallbacks[remoteNode, default: []].insert(record)
90+
}
91+
92+
func removeWatcher(id: ClusterSystem.ActorID) {
93+
// TODO: this can be optimized a bit more I suppose, with a reverse lookup table
94+
let removeMe = WatcherAndCallback(watcherID: id, callback: { _ in () })
95+
for (node, var watcherAndCallbacks) in self.remoteWatchCallbacks {
96+
if watcherAndCallbacks.remove(removeMe) != nil {
97+
self.remoteWatchCallbacks[node] = watcherAndCallbacks
98+
}
99+
}
100+
}
101+
102+
func cleanupTombstone(node: UniqueNode) {
103+
_ = self.nodeTombstones.remove(node)
104+
}
105+
106+
func membershipChanged(_ change: Cluster.MembershipChange) {
107+
guard let change = self.membership.applyMembershipChange(change) else {
108+
return // no change, nothing to act on
109+
}
110+
111+
// TODO: make sure we only handle ONCE?
112+
if change.status >= .down {
113+
// can be: down, leaving or removal.
114+
// on any of those we want to ensure we handle the "down"
115+
self.handleAddressDown(change)
116+
}
117+
}
118+
119+
func handleAddressDown(_ change: Cluster.MembershipChange) {
120+
let terminatedNode = change.node
121+
122+
if let watchers = self.remoteWatchCallbacks.removeValue(forKey: terminatedNode) {
123+
for watcher in watchers {
124+
Task {
125+
await watcher.callback(terminatedNode)
126+
}
127+
}
128+
}
129+
130+
// we need to keep a tombstone, so we can immediately reply with a terminated,
131+
// in case another watch was just in progress of being made
132+
self.nodeTombstones.insert(terminatedNode)
133+
}
134+
135+
func cancel() {
136+
self.eventListenerTask?.cancel()
137+
self.eventListenerTask = nil
138+
}
139+
}
140+
141+
extension DistributedNodeDeathWatcher {
142+
struct WatcherAndCallback: Hashable {
143+
/// Address of the local watcher which had issued this watch
144+
let watcherID: ClusterSystem.ActorID
145+
let callback: @Sendable (UniqueNode) async -> Void
146+
147+
func hash(into hasher: inout Hasher) {
148+
hasher.combine(self.watcherID)
149+
}
150+
151+
static func == (lhs: WatcherAndCallback, rhs: WatcherAndCallback) -> Bool {
152+
lhs.watcherID == rhs.watcherID
153+
}
28154
}
29-
30-
31155
}

Sources/DistributedActors/Cluster/NodeDeathWatcher.swift

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,23 +131,22 @@ internal final class NodeDeathWatcherInstance: NodeDeathWatcher {
131131

132132
func handleAddressDown(_ change: Cluster.MembershipChange) {
133133
let terminatedNode = change.node
134-
134+
135135
// ref
136136
if let watchers = self.remoteWatchers.removeValue(forKey: terminatedNode) {
137137
for ref in watchers {
138138
// we notify each actor that was watching this remote address
139139
ref._sendSystemMessage(.nodeTerminated(terminatedNode))
140140
}
141141
}
142-
142+
143143
if let watchers = self.remoteWatchCallbacks.removeValue(forKey: terminatedNode) {
144144
for watcher in watchers {
145145
Task {
146146
await watcher.callback(terminatedNode)
147147
}
148148
}
149149
}
150-
151150

152151
// we need to keep a tombstone, so we can immediately reply with a terminated,
153152
// in case another watch was just in progress of being made
@@ -203,7 +202,7 @@ enum NodeDeathWatcherShell {
203202
for change in diff.changes {
204203
instance.onMembershipChanged(change)
205204
}
206-
205+
207206
case .membershipChange(let change) where change.isAtLeast(.down):
208207
context.log.info("Node down: \(change)!")
209208
instance.handleAddressDown(change)

Sources/DistributedActors/LifecycleMonitoring/LifecycleWatch.swift

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public protocol LifecycleWatch: DistributedActor where ActorSystem == ClusterSys
3535
// MARK: Lifecycle Watch API
3636

3737
extension LifecycleWatch {
38-
3938
/// Watch the `watchee` actor for termination, and trigger the `whenTerminated` callback when
4039
@available(*, deprecated, message: "Replaced with the much safer `watchTermination(of:)` paired with `actorTerminated(_:)`")
4140
public func watchTermination<Watchee>(
@@ -71,7 +70,7 @@ extension LifecycleWatch {
7170
guard let watch = self.actorSystem._getLifecycleWatch(watcher: self) else {
7271
return watchee
7372
}
74-
73+
7574
watch.termination(of: watchee, whenTerminated: { id in
7675
try? await self.terminated(actor: id)
7776
}, file: file, line: line)
@@ -385,31 +384,27 @@ extension LifecycleWatchContainer {
385384
watchedID: ActorID,
386385
file: String = #file, line: UInt = #line
387386
) {
388-
pprint("[>>>>> /system/nodeDeathWatcher] subscribe...")
389387
self.nodeDeathWatcher?.tell( // different actor
390388
.remoteDistributedActorWatched(
391389
remoteNode: watchedID.uniqueNode,
392390
watcherID: self.watcherID,
393391
nodeTerminated: { [weak self, system] uniqueNode in
394-
pprint("[>>>>> /system/nodeDeathWatcher] callback watched: \(watchedID)...")
395-
396392
guard let self else {
397393
return
398394
}
399-
395+
400396
Task {
401397
self.receiveNodeTerminated(uniqueNode)
402398
}
403-
399+
404400
guard let system = system else {
405401
return
406402
}
407-
403+
408404
guard let myselfRef = system._resolveUntyped(context: .init(id: self.watcherID, system: system)) else {
409405
return
410406
}
411407
myselfRef._sendSystemMessage(.nodeTerminated(uniqueNode), file: file, line: line)
412-
413408
}
414409
)
415410
)

Sources/DistributedActorsTestKit/Cluster/DistributedActor+Assertions.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
1615
import Distributed
1716
import DistributedActors
1817
import XCTest

Sources/DistributedActorsTestKit/ShouldMatchers.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,18 @@ extension CallSiteInfo {
559559

560560
return CallSiteError(callSite: self, explained: message)
561561
}
562+
563+
/// Prepares a detailed error, specialized for a prefix mismatch of a string
564+
///
565+
/// - Warning: Performs file IO in order to read source location line where failure happened
566+
func notMatchingPrefixError(got it: any StringProtocol, expected: any StringProtocol, failTest: Bool = true) -> CallSiteError {
567+
let padding = String(repeating: " ", count: "[error]".count)
568+
return self.error("""
569+
[\(it)]
570+
does start with expected prefix:
571+
\(padding)[\(expected)]\n
572+
""", failTest: failTest)
573+
}
562574
}
563575

564576
/// An error type with additional ``CallSiteInfo`` which is able to pretty print failures.

Sources/DistributedActorsTestKit/TestProbes.swift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,26 @@ extension ActorTestProbe where Message: Equatable {
377377
}
378378
}
379379

380+
extension ActorTestProbe where Message: StringProtocol {
381+
public func expectMessage(prefix: Message, file: StaticString = #file, line: UInt = #line, column: UInt = #column) throws {
382+
try self.expectMessage(prefix: prefix, within: self.expectationTimeout, file: file, line: line, column: column)
383+
}
384+
385+
public func expectMessage(prefix: Message, within timeout: Duration, file: StaticString = #file, line: UInt = #line, column: UInt = #column) throws {
386+
let callSite = CallSiteInfo(file: file, line: line, column: column, function: #function)
387+
do {
388+
let receivedMessage = try self.receiveMessage(within: timeout)
389+
self.lastMessage = receivedMessage
390+
guard receivedMessage.starts(with: prefix) else {
391+
throw callSite.notMatchingPrefixError(got: receivedMessage, expected: prefix)
392+
}
393+
} catch {
394+
let message = "Did not receive String message with prefix [\(prefix)] within [\(timeout.prettyDescription)], error: \(error)"
395+
throw callSite.error(message)
396+
}
397+
}
398+
}
399+
380400
// ==== ----------------------------------------------------------------------------------------------------------------
381401
// MARK: Expecting multiple messages
382402

0 commit comments

Comments
 (0)