Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,8 @@ final class DistributedDiningPhilosophers {
systemC.cluster.join(node: systemB.settings.node)

print("waiting for cluster to form...")
while !(
systemA.cluster.membershipSnapshot.count(withStatus: .up) == systems.count &&
systemB.cluster.membershipSnapshot.count(withStatus: .up) == systems.count &&
systemC.cluster.membershipSnapshot.count(withStatus: .up) == systems.count)
{
// TODO: implement this using "await join cluster" API [#948](https://github.com/apple/swift-distributed-actors/issues/948)
while !(try await self.isClusterFormed(systems)) {
let nanosInSecond: UInt64 = 1_000_000_000
try await Task.sleep(nanoseconds: 1 * nanosInSecond)
}
Expand Down Expand Up @@ -76,4 +73,14 @@ final class DistributedDiningPhilosophers {

try systemA.park(atMost: duration)
}

private func isClusterFormed(_ systems: [ClusterSystem]) async throws -> Bool {
for system in systems {
let upCount = try await system.cluster.membershipSnapshot.count(withStatus: .up)
if upCount != systems.count {
return false
}
}
return true
}
}
34 changes: 22 additions & 12 deletions Sources/DistributedActors/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import Distributed
import DistributedActorsConcurrencyHelpers
import Logging
import NIO
Expand Down Expand Up @@ -42,24 +43,32 @@ public struct ClusterControl {
/// of obtaining the information to act on rather than mixing the two. Use events if transitions state should trigger
/// something, and use the snapshot for ad-hoc "one time" membership inspections.
public var membershipSnapshot: Cluster.Membership {
self.membershipSnapshotLock.lock()
defer { self.membershipSnapshotLock.unlock() }
return self._membershipSnapshotHolder.membership
get async {
await self._membershipSnapshotHolder.membership
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet :)


internal func updateMembershipSnapshot(_ snapshot: Cluster.Membership) {
self.membershipSnapshotLock.lock()
defer { self.membershipSnapshotLock.unlock() }
self._membershipSnapshotHolder.membership = snapshot
Task {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about usages of Task.

await self._membershipSnapshotHolder.update(snapshot)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's probably fine to be honest

}

private let membershipSnapshotLock: Lock
private let _membershipSnapshotHolder: MembershipHolder
private class MembershipHolder {
private actor MembershipHolder {
var membership: Cluster.Membership

init(membership: Cluster.Membership) {
self.membership = membership
}

func update(_ membership: Cluster.Membership) {
self.membership = membership
}

func join(_ node: UniqueNode) {
_ = self.membership.join(node)
}
}

internal let ref: ClusterShell.Ref
Expand All @@ -69,10 +78,11 @@ public struct ClusterControl {
self.ref = clusterRef
self.events = eventStream

let membershipSnapshotLock = Lock()
self.membershipSnapshotLock = membershipSnapshotLock
self._membershipSnapshotHolder = MembershipHolder(membership: .empty)
_ = self._membershipSnapshotHolder.membership.join(settings.uniqueBindNode)
let membershipHolder = ClusterControl.MembershipHolder(membership: .empty)
self._membershipSnapshotHolder = membershipHolder
Task {
await membershipHolder.join(settings.uniqueBindNode)
}
}

/// The node value representing _this_ node in the cluster.
Expand Down
6 changes: 2 additions & 4 deletions Sources/DistributedActors/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,8 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {

self.shutdownSemaphore.wait()

/// Down this member as part of shutting down; it may have enough time to notify other nodes on an best effort basis.
if let myselfMember = self.cluster.membershipSnapshot.uniqueMember(self.cluster.uniqueNode) {
self.cluster.down(member: myselfMember)
}
/// Down this node as part of shutting down; it may have enough time to notify other nodes on an best effort basis.
self.cluster.down(node: self.settings.node)

self.settings.plugins.stopAll(self)

Expand Down
41 changes: 41 additions & 0 deletions Sources/DistributedActorsTestKit/ActorTestKit.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,47 @@ extension ActorTestKit {
}
throw error
}

/// Executes passed in block numerous times, until a the expected value is obtained or the `within` time limit expires,
/// in which case an `EventuallyError` is thrown, along with the last encountered error thrown by block.
///
/// `eventually` is designed to be used with the `expectX` functions on `ActorTestProbe`.
///
/// **CAUTION**: Using `shouldX` matchers in an `eventually` block will fail the test on the first failure.
///
// TODO: does not handle blocking longer than `within` well
// TODO: should use default `within` from TestKit
@discardableResult
public func eventually<T>(
within duration: Duration, interval: Duration = .milliseconds(100),
file: StaticString = #file, line: UInt = #line, column: UInt = #column,
_ block: () async throws -> T
) async throws -> T {
let callSite = CallSiteInfo(file: file, line: line, column: column, function: #function)
let deadline = ContinuousClock.Instant.fromNow(duration)

var lastError: Error?
var polledTimes = 0

ActorTestKit.enterRepeatableContext()
while deadline.hasTimeLeft() {
do {
polledTimes += 1
let res = try await block()
return res
} catch {
lastError = error
usleep(useconds_t(interval.microseconds))
}
}
ActorTestKit.leaveRepeatableContext()

let error = EventuallyError(callSite, duration, polledTimes, lastError: lastError)
if !ActorTestKit.isInRepeatableContext() {
XCTFail("\(error)", file: callSite.file, line: callSite.line)
}
throw error
}
}

/// Thrown by `ActorTestKit.eventually` when the encapsulated assertion fails enough times that the eventually rethrows it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,48 +126,48 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
node: ClusterSystem, with other: ClusterSystem,
ensureWithin: Duration? = nil, ensureMembers maybeExpectedStatus: Cluster.MemberStatus? = nil,
file: StaticString = #file, line: UInt = #line
) throws {
) async throws {
node.cluster.join(node: other.cluster.uniqueNode.node)

try assertAssociated(node, withAtLeast: other.settings.uniqueBindNode)
try assertAssociated(other, withAtLeast: node.settings.uniqueBindNode)

if let expectedStatus = maybeExpectedStatus {
if let specificTimeout = ensureWithin {
try self.ensureNodes(expectedStatus, on: node, within: specificTimeout, nodes: other.cluster.uniqueNode, file: file, line: line)
try await self.ensureNodes(expectedStatus, on: node, within: specificTimeout, nodes: other.cluster.uniqueNode, file: file, line: line)
} else {
try self.ensureNodes(expectedStatus, on: node, nodes: other.cluster.uniqueNode, file: file, line: line)
try await self.ensureNodes(expectedStatus, on: node, nodes: other.cluster.uniqueNode, file: file, line: line)
}
}
}

public func ensureNodes(
_ status: Cluster.MemberStatus, on system: ClusterSystem? = nil, within: Duration = .seconds(20), nodes: UniqueNode...,
file: StaticString = #file, line: UInt = #line
) throws {
try self.ensureNodes(status, on: system, within: within, nodes: nodes, file: file, line: line)
) async throws {
try await self.ensureNodes(status, on: system, within: within, nodes: nodes, file: file, line: line)
}

public func ensureNodes(
atLeast status: Cluster.MemberStatus, on system: ClusterSystem? = nil, within: Duration = .seconds(20), nodes: UniqueNode...,
file: StaticString = #file, line: UInt = #line
) throws {
try self.ensureNodes(atLeast: status, on: system, within: within, nodes: nodes, file: file, line: line)
) async throws {
try await self.ensureNodes(atLeast: status, on: system, within: within, nodes: nodes, file: file, line: line)
}

public func ensureNodes(
_ status: Cluster.MemberStatus, on system: ClusterSystem? = nil, within: Duration = .seconds(20), nodes: [UniqueNode],
file: StaticString = #file, line: UInt = #line
) throws {
) async throws {
guard let onSystem = system ?? self._nodes.first(where: { !$0.isShuttingDown }) else {
fatalError("Must at least have 1 system present to use [\(#function)]")
}

try self.testKit(onSystem).eventually(within: within, file: file, line: line) {
try await self.testKit(onSystem).eventually(within: within, file: file, line: line) {
do {
// all members on onMember should have reached this status (e.g. up)
for node in nodes {
try self.assertMemberStatus(on: onSystem, node: node, is: status, file: file, line: line)
try await self.assertMemberStatus(on: onSystem, node: node, is: status, file: file, line: line)
}
} catch {
throw error
Expand All @@ -178,16 +178,16 @@ open class ClusteredActorSystemsXCTestCase: XCTestCase {
public func ensureNodes(
atLeast status: Cluster.MemberStatus, on system: ClusterSystem? = nil, within: Duration = .seconds(20), nodes: [UniqueNode],
file: StaticString = #file, line: UInt = #line
) throws {
) async throws {
guard let onSystem = system ?? self._nodes.first(where: { !$0.isShuttingDown }) else {
fatalError("Must at least have 1 system present to use [\(#function)]")
}

try self.testKit(onSystem).eventually(within: within, file: file, line: line) {
try await self.testKit(onSystem).eventually(within: within, file: file, line: line) {
do {
// all members on onMember should have reached this status (e.g. up)
for node in nodes {
_ = try self.assertMemberStatus(on: onSystem, node: node, atLeast: status, file: file, line: line)
_ = try await self.assertMemberStatus(on: onSystem, node: node, atLeast: status, file: file, line: line)
}
} catch {
throw error
Expand Down Expand Up @@ -361,9 +361,9 @@ extension ClusteredActorSystemsXCTestCase {
on system: ClusterSystem, node: UniqueNode,
is expectedStatus: Cluster.MemberStatus,
file: StaticString = #file, line: UInt = #line
) throws {
) async throws {
let testKit = self.testKit(system)
let membership = system.cluster.membershipSnapshot
let membership = await system.cluster.membershipSnapshot
guard let foundMember = membership.uniqueMember(node) else {
throw testKit.error("Expected [\(system.cluster.uniqueNode)] to know about [\(node)] member", file: file, line: line)
}
Expand All @@ -384,9 +384,9 @@ extension ClusteredActorSystemsXCTestCase {
on system: ClusterSystem, node: UniqueNode,
atLeast expectedAtLeastStatus: Cluster.MemberStatus,
file: StaticString = #file, line: UInt = #line
) throws -> Cluster.MemberStatus? {
) async throws -> Cluster.MemberStatus? {
let testKit = self.testKit(system)
let membership = system.cluster.membershipSnapshot
let membership = await system.cluster.membershipSnapshot
guard let foundMember = membership.uniqueMember(node) else {
if expectedAtLeastStatus == .down || expectedAtLeastStatus == .removed {
// so we're seeing an already removed member, this can indeed happen and is okey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase {
alone.cluster.join(node: alone.cluster.uniqueNode.node) // "self join", should simply be ignored

let testKit = self.testKit(alone)
try testKit.eventually(within: .seconds(3)) {
let snapshot: Cluster.Membership = alone.cluster.membershipSnapshot
try await testKit.eventually(within: .seconds(3)) {
let snapshot: Cluster.Membership = await alone.cluster.membershipSnapshot
if snapshot.count != 1 {
throw TestError("Expected membership to include self node, was: \(snapshot)")
}
Expand Down Expand Up @@ -274,8 +274,8 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase {
first.cluster.down(node: first.cluster.uniqueNode.node)

let testKit = self.testKit(first)
try testKit.eventually(within: .seconds(3)) {
let snapshot: Cluster.Membership = first.cluster.membershipSnapshot
try await testKit.eventually(within: .seconds(3)) {
let snapshot: Cluster.Membership = await first.cluster.membershipSnapshot
if let selfMember = snapshot.uniqueMember(first.cluster.uniqueNode) {
if selfMember.status == .down {
() // good
Expand Down Expand Up @@ -372,15 +372,15 @@ final class ClusterAssociationTests: ClusteredActorSystemsXCTestCase {
let secondProbe = self.testKit(second).makeTestProbe(expecting: Cluster.Membership.self)

// we we down first on first, it should become down there:
try self.testKit(first).eventually(within: .seconds(3)) {
try await self.testKit(first).eventually(within: .seconds(3)) {
first.cluster.ref.tell(.query(.currentMembership(firstProbe.ref)))
let firstMembership = try firstProbe.expectMessage()

guard let selfMember = firstMembership.uniqueMember(first.cluster.uniqueNode) else {
throw self.testKit(second).error("No self member in membership! Wanted: \(first.cluster.uniqueNode)", line: #line - 1)
}

try self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .down)
try await self.assertMemberStatus(on: first, node: first.cluster.uniqueNode, is: .down)
guard selfMember.status == .down else {
throw self.testKit(first).error("Wanted self member to be DOWN, but was: \(selfMember)", line: #line - 1)
}
Expand Down
Loading