Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ final class DistributedDiningPhilosophers {
systemC.cluster.join(node: systemB.settings.node)

print("waiting for cluster to form...")
while !(
systemA.cluster.membershipSnapshot.count(withStatus: .up) == systems.count &&
systemB.cluster.membershipSnapshot.count(withStatus: .up) == systems.count &&
systemC.cluster.membershipSnapshot.count(withStatus: .up) == systems.count)
{
while !(try await self.isClusterFormed(systems)) {
let nanosInSecond: UInt64 = 1_000_000_000
try await Task.sleep(nanoseconds: 1 * nanosInSecond)
}
Expand Down Expand Up @@ -76,4 +72,14 @@ final class DistributedDiningPhilosophers {

try systemA.park(atMost: duration)
}

private func isClusterFormed(_ systems: [ClusterSystem]) async throws -> Bool {
for system in systems {
let upCount = try await system.cluster.membershipSnapshot.count(withStatus: .up)
if upCount != systems.count {
return false
}
}
return true
}
}
54 changes: 40 additions & 14 deletions Sources/DistributedActors/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import Distributed
import DistributedActorsConcurrencyHelpers
import Logging
import NIO
Expand Down Expand Up @@ -42,37 +43,62 @@ public struct ClusterControl {
/// of obtaining the information to act on rather than mixing the two. Use events if transitions state should trigger
/// something, and use the snapshot for ad-hoc "one time" membership inspections.
public var membershipSnapshot: Cluster.Membership {
self.membershipSnapshotLock.lock()
defer { self.membershipSnapshotLock.unlock() }
return self._membershipSnapshotHolder.membership
get async throws {
try await self._membershipSnapshotHolder.get()
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet :)


internal func updateMembershipSnapshot(_ snapshot: Cluster.Membership) {
self.membershipSnapshotLock.lock()
defer { self.membershipSnapshotLock.unlock() }
self._membershipSnapshotHolder.membership = snapshot
Task {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about usages of Task.

try await self._membershipSnapshotHolder.update(snapshot)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's probably fine to be honest

}

private let membershipSnapshotLock: Lock
private let _membershipSnapshotHolder: MembershipHolder
private class MembershipHolder {
internal distributed actor MembershipHolder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe as just an actor, no need to for distribution in this one after all?

Copy link
Member Author

@yim-lee yim-lee Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that simplifies things

Amended 3efd08f

typealias ActorSystem = ClusterSystem

static let path: ActorPath = try! ActorPath._system.appending(segment: ActorPathSegment("clusterMembership"))

static var props: _Props {
var ps = _Props()
ps._knownActorName = Self.path.name
ps._systemActor = true
ps._wellKnown = true
return ps
}

var membership: Cluster.Membership
init(membership: Cluster.Membership) {

init(membership: Cluster.Membership, actorSystem: ActorSystem) {
self.actorSystem = actorSystem
self.membership = membership
}

distributed func get() -> Cluster.Membership {
self.membership
}

distributed func update(_ membership: Cluster.Membership) {
self.membership = membership
}

distributed func join(_ node: UniqueNode) {
_ = self.membership.join(node)
}
}

internal let ref: ClusterShell.Ref

init(_ settings: ClusterSystemSettings, clusterRef: ClusterShell.Ref, eventStream: EventStream<Cluster.Event>) {
init(_ settings: ClusterSystemSettings, clusterRef: ClusterShell.Ref, membership: MembershipHolder, eventStream: EventStream<Cluster.Event>) {
self.settings = settings
self.ref = clusterRef
self.events = eventStream

let membershipSnapshotLock = Lock()
self.membershipSnapshotLock = membershipSnapshotLock
self._membershipSnapshotHolder = MembershipHolder(membership: .empty)
_ = self._membershipSnapshotHolder.membership.join(settings.uniqueBindNode)
self._membershipSnapshotHolder = membership
Task {
try await membership.join(settings.uniqueBindNode)
}
}

/// The node value representing _this_ node in the cluster.
Expand Down
16 changes: 12 additions & 4 deletions Sources/DistributedActors/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,10 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
)

_ = self._clusterStore.storeIfNilThenLoad(Box(nil))
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: self.deadLetters.adapted(), eventStream: clusterEvents)))
await _Props.$forSpawn.withValue(ClusterControl.MembershipHolder.props) {
let membership = ClusterControl.MembershipHolder(membership: .empty, actorSystem: self)
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: self.deadLetters.adapted(), membership: membership, eventStream: clusterEvents)))
}
}

// node watcher MUST be prepared before receptionist (or any other actor) because it (and all actors) need it if we're running clustered
Expand All @@ -331,7 +334,10 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
customBehavior: ClusterEventStream.Shell.behavior
)
let clusterRef = try! cluster.start(system: self, clusterEvents: clusterEvents) // only spawns when cluster is initialized
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: clusterRef, eventStream: clusterEvents)))
await _Props.$forSpawn.withValue(ClusterControl.MembershipHolder.props) {
let membership = ClusterControl.MembershipHolder(membership: .empty, actorSystem: self)
_ = self._clusterControlStore.storeIfNilThenLoad(Box(ClusterControl(settings, clusterRef: clusterRef, membership: membership, eventStream: clusterEvents)))
}

self._associationTombstoneCleanupTask = eventLoopGroup.next().scheduleRepeatedTask(
initialDelay: settings.associationTombstoneCleanupInterval.toNIO,
Expand Down Expand Up @@ -491,8 +497,10 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
self.shutdownSemaphore.wait()

/// Down this member as part of shutting down; it may have enough time to notify other nodes on an best effort basis.
if let myselfMember = self.cluster.membershipSnapshot.uniqueMember(self.cluster.uniqueNode) {
self.cluster.down(member: myselfMember)
Task {
if let myselfMember = try await self.cluster.membershipSnapshot.uniqueMember(self.cluster.uniqueNode) {
self.cluster.down(member: myselfMember)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably is enough to use the cluster.down(node:) API, so then we don't need the task and async await dance 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amended 3efd08f


self.settings.plugins.stopAll(self)
Expand Down
41 changes: 41 additions & 0 deletions Sources/DistributedActorsTestKit/ActorTestKit.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,47 @@ extension ActorTestKit {
}
throw error
}

/// Executes passed in block numerous times, until a the expected value is obtained or the `within` time limit expires,
/// in which case an `EventuallyError` is thrown, along with the last encountered error thrown by block.
///
/// `eventually` is designed to be used with the `expectX` functions on `ActorTestProbe`.
///
/// **CAUTION**: Using `shouldX` matchers in an `eventually` block will fail the test on the first failure.
///
// TODO: does not handle blocking longer than `within` well
// TODO: should use default `within` from TestKit
@discardableResult
public func eventually<T>(
within duration: Duration, interval: Duration = .milliseconds(100),
file: StaticString = #file, line: UInt = #line, column: UInt = #column,
_ block: () async throws -> T
) async throws -> T {
let callSite = CallSiteInfo(file: file, line: line, column: column, function: #function)
let deadline = ContinuousClock.Instant.fromNow(duration)

var lastError: Error?
var polledTimes = 0

ActorTestKit.enterRepeatableContext()
while deadline.hasTimeLeft() {
do {
polledTimes += 1
let res = try await block()
return res
} catch {
lastError = error
usleep(useconds_t(interval.microseconds))
}
}
ActorTestKit.leaveRepeatableContext()

let error = EventuallyError(callSite, duration, polledTimes, lastError: lastError)
if !ActorTestKit.isInRepeatableContext() {
XCTFail("\(error)", file: callSite.file, line: callSite.line)
}
throw error
}
}

/// Thrown by `ActorTestKit.eventually` when the encapsulated assertion fails enough times that the eventually rethrows it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,48 +126,48 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
node: ClusterSystem, with other: ClusterSystem,
ensureWithin: Duration? = nil, ensureMembers maybeExpectedStatus: Cluster.MemberStatus? = nil,
file: StaticString = #file, line: UInt = #line
) throws {
) async throws {
node.cluster.join(node: other.cluster.uniqueNode.node)

try assertAssociated(node, withAtLeast: other.settings.uniqueBindNode)
try assertAssociated(other, withAtLeast: node.settings.uniqueBindNode)

if let expectedStatus = maybeExpectedStatus {
if let specificTimeout = ensureWithin {
try self.ensureNodes(expectedStatus, on: node, within: specificTimeout, nodes: other.cluster.uniqueNode, file: file, line: line)
try await self.ensureNodes(expectedStatus, on: node, within: specificTimeout, nodes: other.cluster.uniqueNode, file: file, line: line)
} else {
try self.ensureNodes(expectedStatus, on: node, nodes: other.cluster.uniqueNode, file: file, line: line)
try await self.ensureNodes(expectedStatus, on: node, nodes: other.cluster.uniqueNode, file: file, line: line)
}
}
}

public func ensureNodes(
_ status: Cluster.MemberStatus, on system: ClusterSystem? = nil, within: Duration = .seconds(20), nodes: UniqueNode...,
file: StaticString = #file, line: UInt = #line
) throws {
try self.ensureNodes(status, on: system, within: within, nodes: nodes, file: file, line: line)
) async throws {
try await self.ensureNodes(status, on: system, within: within, nodes: nodes, file: file, line: line)
}

public func ensureNodes(
atLeast status: Cluster.MemberStatus, on system: ClusterSystem? = nil, within: Duration = .seconds(20), nodes: UniqueNode...,
file: StaticString = #file, line: UInt = #line
) throws {
try self.ensureNodes(atLeast: status, on: system, within: within, nodes: nodes, file: file, line: line)
) async throws {
try await self.ensureNodes(atLeast: status, on: system, within: within, nodes: nodes, file: file, line: line)
}

public func ensureNodes(
_ status: Cluster.MemberStatus, on system: ClusterSystem? = nil, within: Duration = .seconds(20), nodes: [UniqueNode],
file: StaticString = #file, line: UInt = #line
) throws {
) async throws {
guard let onSystem = system ?? self._nodes.first(where: { !$0.isShuttingDown }) else {
fatalError("Must at least have 1 system present to use [\(#function)]")
}

try self.testKit(onSystem).eventually(within: within, file: file, line: line) {
try await self.testKit(onSystem).eventually(within: within, file: file, line: line) {
do {
// all members on onMember should have reached this status (e.g. up)
for node in nodes {
try self.assertMemberStatus(on: onSystem, node: node, is: status, file: file, line: line)
try await self.assertMemberStatus(on: onSystem, node: node, is: status, file: file, line: line)
}
} catch {
throw error
Expand All @@ -178,16 +178,16 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
public func ensureNodes(
atLeast status: Cluster.MemberStatus, on system: ClusterSystem? = nil, within: Duration = .seconds(20), nodes: [UniqueNode],
file: StaticString = #file, line: UInt = #line
) throws {
) async throws {
guard let onSystem = system ?? self._nodes.first(where: { !$0.isShuttingDown }) else {
fatalError("Must at least have 1 system present to use [\(#function)]")
}

try self.testKit(onSystem).eventually(within: within, file: file, line: line) {
try await self.testKit(onSystem).eventually(within: within, file: file, line: line) {
do {
// all members on onMember should have reached this status (e.g. up)
for node in nodes {
_ = try self.assertMemberStatus(on: onSystem, node: node, atLeast: status, file: file, line: line)
_ = try await self.assertMemberStatus(on: onSystem, node: node, atLeast: status, file: file, line: line)
}
} catch {
throw error
Expand Down Expand Up @@ -361,9 +361,9 @@ extension ClusteredActorSystemsXCTestCase {
on system: ClusterSystem, node: UniqueNode,
is expectedStatus: Cluster.MemberStatus,
file: StaticString = #file, line: UInt = #line
) throws {
) async throws {
let testKit = self.testKit(system)
let membership = system.cluster.membershipSnapshot
let membership = try await system.cluster.membershipSnapshot
guard let foundMember = membership.uniqueMember(node) else {
throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line)
}
Expand All @@ -384,9 +384,9 @@ extension ClusteredActorSystemsXCTestCase {
on system: ClusterSystem, node: UniqueNode,
atLeast expectedAtLeastStatus: Cluster.MemberStatus,
file: StaticString = #file, line: UInt = #line
) throws -> Cluster.MemberStatus? {
) async throws -> Cluster.MemberStatus? {
let testKit = self.testKit(system)
let membership = system.cluster.membershipSnapshot
let membership = try await system.cluster.membershipSnapshot
guard let foundMember = membership.uniqueMember(node) else {
if expectedAtLeastStatus == .down || expectedAtLeastStatus == .removed {
// so we're seeing an already removed member, this can indeed happen and is okey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase {
alone.cluster.join(node: alone.cluster.uniqueNode.node) // "self join", should simply be ignored

let testKit = self.testKit(alone)
try testKit.eventually(within: .seconds(3)) {
let snapshot: Cluster.Membership = alone.cluster.membershipSnapshot
try await testKit.eventually(within: .seconds(3)) {
let snapshot: Cluster.Membership = try await alone.cluster.membershipSnapshot
if snapshot.count != 1 {
throw TestError("Expected membership to include self node, was: \(snapshot)")
}
Expand Down Expand Up @@ -274,8 +274,8 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase {
first.cluster.down(node: first.cluster.uniqueNode.node)

let testKit = self.testKit(first)
try testKit.eventually(within: .seconds(3)) {
let snapshot: Cluster.Membership = first.cluster.membershipSnapshot
try await testKit.eventually(within: .seconds(3)) {
let snapshot: Cluster.Membership = try await first.cluster.membershipSnapshot
if let selfMember = snapshot.uniqueMember(first.cluster.uniqueNode) {
if selfMember.status == .down {
() // good
Expand Down Expand Up @@ -372,15 +372,15 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase {
let secondProbe = self.testKit(second).makeTestProbe(expecting: Cluster.Membership.self)

// we we down first on first, it should become down there:
try self.testKit(first).eventually(within: .seconds(3)) {
try await self.testKit(first).eventually(within: .seconds(3)) {
first.cluster.ref.tell(.query(.currentMembership(firstProbe.ref)))
let firstMembership = try firstProbe.expectMessage()

guard let selfMember = firstMembership.uniqueMember(first.cluster.uniqueNode) else {
throw self.testKit(second).error("No self member in membership! Wanted: \(first.cluster.uniqueNode)", line: #line - 1)
}

try self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .down)
try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .down)
guard selfMember.status == .down else {
throw self.testKit(first).error("Wanted self member to be DOWN, but was: \(selfMember)", line: #line - 1)
}
Expand Down
Loading