Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,12 @@ internal distributed actor DowningStrategyShell {
self.memberTimerTasks[member] = Task {
defer { self.memberTimerTasks.removeValue(forKey: member) }

try await Task.sleep(until: .now + delay, clock: .continuous)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nice thx

guard !Task.isCancelled else {
return
}

try await Task.sleep(until: .now + delay, clock: .continuous)
self.onTimeout(member: member)
}
case .cancelTimer(let member):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import Logging
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Cluster singleton boss

internal protocol _ClusterSingletonBoss {
internal protocol ClusterSingletonBossProtocol {
func stop()
}

Expand All @@ -37,7 +37,7 @@ internal protocol _ClusterSingletonBoss {
/// determine the node that the singleton runs on. If the singleton falls on *this* node, `ClusterSingletonBoss`
/// will spawn the actual singleton actor. Otherwise, `ClusterSingletonBoss` will hand over the singleton
/// whenever the node changes.
internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _ClusterSingletonBoss where Act.ActorSystem == ClusterSystem {
internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>: ClusterSingletonBossProtocol where Act.ActorSystem == ClusterSystem {
Copy link
Member

@ktoso ktoso Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(love the boss naming, makes me smile each time lol) Enough manager objects already! 😆

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

typealias ActorSystem = ClusterSystem
typealias CallID = UUID

Expand All @@ -46,18 +46,20 @@ internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _Cluster
/// The strategy that determines which node the singleton will be allocated.
private let allocationStrategy: ClusterSingletonAllocationStrategy

// FIXME: not needed?
let singletonProps: _Props?

/// If `nil`, then this instance will be proxy-only and it will never run the actual actor.
let singletonFactory: ((ClusterSystem) async throws -> Act)?

/// The node that the singleton runs on
private var targetNode: UniqueNode?

/// The concrete distributed actor instance (the "singleton") if this node is indeed hosting it,
/// or nil otherwise - meaning that the singleton instance is actually located on another member.
private var singleton: Act?
/// Keeps track of singleton allocation status
private let allocationTracker: AllocationTracker

private var singleton: Act? {
get async {
await self.allocationTracker.singleton
}
}

/// Remote call "buffer" in case `singleton` is `nil`
private var remoteCallContinuations: [(CallID, CheckedContinuation<Act, Never>)] = []
Expand All @@ -70,14 +72,13 @@ internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _Cluster
init(
settings: ClusterSingletonSettings,
system: ActorSystem,
singletonProps: _Props?,
_ singletonFactory: ((ClusterSystem) async throws -> Act)?
) async throws {
self.actorSystem = system
self.settings = settings
self.allocationStrategy = settings.allocationStrategy.makeAllocationStrategy(system.settings, settings)
self.singletonProps = singletonProps
self.singletonFactory = singletonFactory
self.allocationTracker = await AllocationTracker(timeout: settings.allocationTimeout)

if system.settings.enabled {
self.clusterEventsSubscribeTask = Task {
Expand Down Expand Up @@ -109,7 +110,8 @@ internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _Cluster

private func updateTargetNode(node: UniqueNode?) async throws {
guard self.targetNode != node else {
self.log.debug("Skip updating target node. New node is already the same as current targetNode.", metadata: self.metadata())
let metadata = await self.metadata()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow this is bad, definitely not async logger metadata access. (Why it's bad: this will always render all the metadata, AND causes suspensions even if we're not even logging -- the logging must be done inside the debug(...) call because it is lazily evaluated (metadata is an auto closure)

It's because the move of the singleton in to the other actor -- if let singleton = await self.singleton {
but why did we do this in a new actor to begin with...? This actor should be able to do everything necessary?

Copy link
Member Author

@yim-lee yim-lee Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

over-engineered it. wanted to group the logic but got carried away. amended 3b882d6.

self.log.debug("Skip updating target node. New node is already the same as current targetNode.", metadata: metadata)
return
}

Expand All @@ -129,7 +131,7 @@ internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _Cluster
}

// Update `singleton` regardless
try self.updateSingleton(node: node)
try await self.updateSingleton(node: node)
}
}

Expand All @@ -138,41 +140,43 @@ internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _Cluster
preconditionFailure("Cluster singleton [\(self.settings.name)] cannot run on this node. Please review AllocationStrategySettings and/or cluster singleton usage.")
}

self.log.debug("Take over singleton [\(self.settings.name)] from [\(String(describing: from))]", metadata: self.metadata())
let metadata = await self.metadata()
self.log.debug("Take over singleton [\(self.settings.name)] from [\(String(describing: from))]", metadata: metadata)

let props = self.singletonProps ?? _Props()
// TODO: (optimization) tell `from` node that this node is taking over (https://github.com/apple/swift-distributed-actors/issues/329)
let singleton = try await _Props.$forSpawn.withValue(props.singleton(settings: self.settings)) {
let singleton = try await _Props.$forSpawn.withValue(_Props.singleton(settings: self.settings)) {
try await singletonFactory(self.actorSystem)
}
self.singleton = singleton
self.updateSingleton(singleton)
await self.updateSingleton(singleton)
}

private func handOver(to: UniqueNode?) async throws {
self.log.debug("Hand over singleton [\(self.settings.name)] to [\(String(describing: to))]", metadata: self.metadata())
let metadata = await self.metadata()
self.log.debug("Hand over singleton [\(self.settings.name)] to [\(String(describing: to))]", metadata: metadata)

// TODO: (optimization) tell `to` node that this node is handing off (https://github.com/apple/swift-distributed-actors/issues/329)
self.singleton = nil
await self.updateSingleton(nil)
}

private func updateSingleton(node: UniqueNode?) throws {
private func updateSingleton(node: UniqueNode?) async throws {
switch node {
case .some(let node) where node == self.actorSystem.cluster.uniqueNode:
()
case .some(let node):
self.singleton = try Act.resolve(id: .singleton(Act.self, settings: self.settings, remote: node), using: self.actorSystem)
let singleton = try Act.resolve(id: .singleton(Act.self, settings: self.settings, remote: node), using: self.actorSystem)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI, That's one of the places I have new impl based on well-known metadata for :)

await self.updateSingleton(singleton)
case .none:
self.singleton = nil
await self.updateSingleton(nil)
}
}

private func updateSingleton(_ newAct: Act?) {
self.log.debug("Update singleton from [\(String(describing: self.singleton))] to [\(String(describing: newAct))], flushing \(self.remoteCallContinuations.count) remote calls")
self.singleton = newAct
private func updateSingleton(_ newSingleton: Act?) async {
let currentSingleton = await self.singleton
self.log.debug("Update singleton from [\(String(describing: currentSingleton))] to [\(String(describing: newSingleton))], with \(self.remoteCallContinuations.count) remote calls pending")
await self.allocationTracker.updateSingleton(newSingleton)

// Unstash messages if we have the singleton
guard let singleton = self.singleton else {
guard let singleton = newSingleton else {
return
}

Expand All @@ -191,8 +195,9 @@ internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _Cluster
where Err: Error,
Res: Codable
{
let singleton = await self.findSingleton()
self.log.trace("Forwarding invocation [\(invocation)] to [\(singleton)]", metadata: self.metadata())
let singleton = try await self.findSingleton()
let metadata = await self.metadata()
self.log.trace("Forwarding invocation [\(invocation)] to [\(singleton)]", metadata: metadata)

var invocation = invocation // FIXME: should be inout param
return try await singleton.actorSystem.remoteCall(
Expand All @@ -209,8 +214,9 @@ internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _Cluster
invocation: ActorSystem.InvocationEncoder,
throwing: Err.Type
) async throws where Err: Error {
let singleton = await self.findSingleton()
self.log.trace("Forwarding invocation [\(invocation)] to [\(singleton)]", metadata: self.metadata())
let singleton = try await self.findSingleton()
let metadata = await self.metadata()
self.log.trace("Forwarding invocation [\(invocation)] to [\(singleton)]", metadata: metadata)

var invocation = invocation // FIXME: should be inout param
return try await singleton.actorSystem.remoteCallVoid(
Expand All @@ -221,13 +227,18 @@ internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _Cluster
)
}

private func findSingleton() async -> Act {
await withCheckedContinuation { continuation in
// If singleton is available, forward remote call to it.
if let singleton = self.singleton {
continuation.resume(returning: singleton)
return
}
private func findSingleton() async throws -> Act {
let allocationStatus = await self.allocationTracker.status
guard allocationStatus != .timedOut else {
throw ClusterSingletonError.allocationTimeout
}

// If singleton is available, forward remote call to it right away.
if let singleton = await self.singleton {
return singleton
}

return await withCheckedContinuation { continuation in
// Otherwise, we "stash" the remote call until singleton becomes available.
Task {
let callID = UUID()
Expand Down Expand Up @@ -271,21 +282,85 @@ internal distributed actor ClusterSingletonBoss<Act: DistributedActor>: _Cluster
}
}
}

actor AllocationTracker {
/// The concrete distributed actor instance (the "singleton") if this node is indeed hosting it,
/// or nil otherwise - meaning that the singleton instance is actually located on another member.
var singleton: Act? {
didSet {
switch self.singleton {
case .some:
self.status = .allocated
self.timeoutTask?.cancel()
self.timeoutTask = nil
case .none:
self.status = .pending
if self.timeoutTask == nil {
Task {
await self.startTimeoutTask()
}
}
}
}
}

var status: Status = .pending

var timeoutTask: Task<Void, Error>?
let timeout: Duration

init(timeout: Duration) async {
self.timeout = timeout
self.singleton = nil
self.status = .pending
await self.startTimeoutTask()
}

func updateSingleton(_ singleton: Act?) async {
self.singleton = singleton
}

private func startTimeoutTask() async {
self.timeoutTask = Task {
try await Task.sleep(until: .now + self.timeout, clock: .continuous)

guard !Task.isCancelled else {
return
}

self.onTimeout()
}
}

private func onTimeout() {
self.status = .timedOut
}

enum Status {
case allocated
case pending
case timedOut
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Basically our new "async" stash replacement, looking good


enum ClusterSingletonError: Error, Codable {
case allocationTimeout
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Logging

extension ClusterSingletonBoss {
func metadata() -> Logger.Metadata {
func metadata() async -> Logger.Metadata {
var metadata: Logger.Metadata = [
"tag": "singleton",
"singleton/name": "\(self.settings.name)",
"singleton/buffer": "\(self.remoteCallContinuations.count)/\(self.settings.bufferCapacity)",
]

metadata["targetNode"] = "\(String(describing: self.targetNode?.debugDescription))"
if let singleton = self.singleton {
if let singleton = await self.singleton {
metadata["singleton"] = "\(singleton.id)"
}

Expand All @@ -302,7 +377,7 @@ extension ActorID {
settings: ClusterSingletonSettings,
remote node: UniqueNode
) throws -> ActorID
where Act: DistributedActor,
where Act: ClusterSingletonProtocol,
Act.ActorSystem == ClusterSystem
{
var id = ActorID(remote: node, type: type, incarnation: .wellKnown)
Expand Down Expand Up @@ -332,7 +407,7 @@ extension ClusterSingletonSettings {
// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Remote call interceptor

struct ClusterSingletonRemoteCallInterceptor<Singleton: DistributedActor>: RemoteCallInterceptor where Singleton.ActorSystem == ClusterSystem {
struct ClusterSingletonRemoteCallInterceptor<Singleton: ClusterSingletonProtocol>: RemoteCallInterceptor where Singleton.ActorSystem == ClusterSystem {
let system: ClusterSystem
let singletonBoss: ClusterSingletonBoss<Singleton>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public protocol ClusterSingletonProtocol: DistributedActor where ActorSystem ==
/// - SeeAlso: The `ClusterSingleton` mechanism is conceptually similar to Erlang/OTP's <a href="http://erlang.org/doc/design_principles/distributed_applications.html">`DistributedApplication`</a>,
/// and <a href="https://doc.akka.io/docs/akka/current/cluster-singleton.html">`ClusterSingleton` in Akka</a>.
public actor ClusterSingletonPlugin {
private var singletons: [String: (proxyID: ActorID, boss: any _ClusterSingletonBoss)] = [:]
private var singletons: [String: (proxyID: ActorID, boss: any ClusterSingletonBossProtocol)] = [:]

private var system: ClusterSystem!

Expand All @@ -50,7 +50,7 @@ public actor ClusterSingletonPlugin {
settings: ClusterSingletonSettings,
makeInstance factory: ((ClusterSystem) async throws -> Act)? = nil
) async throws -> Act
where Act: DistributedActor,
where Act: ClusterSingletonProtocol,
Act.ActorSystem == ClusterSystem
{
let known = self.singletons[settings.name]
Expand All @@ -62,7 +62,6 @@ public actor ClusterSingletonPlugin {
let boss = try await ClusterSingletonBoss(
settings: settings,
system: self.system,
singletonProps: .init(),
factory
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public struct ClusterSingletonSettings {
/// Controls allocation of the node on which the singleton runs.
public var allocationStrategy: AllocationStrategySettings = .byLeadership

/// Time to wait for the singleton, whether allocated on this node or another, before
/// we stop stashing calls and throw error.
public var allocationTimeout: Duration = .seconds(30)

public init(name: String) {
self.name = name
}
Expand Down