Implement ActorSingleton for DistributedActor#980
Conversation
Sources/DistributedActors/Serialization/Serialization+Manifest.swift
Outdated
Show resolved
Hide resolved
Sources/DistributedActors/Serialization/Serialization+Invocation.swift
Outdated
Show resolved
Hide resolved
| self.singletonsLock.withLock { | ||
| for (_, singleton) in self.singletons { | ||
| singleton.stop(system) | ||
| public nonisolated func stop(_ system: ClusterSystem) { |
There was a problem hiding this comment.
Arguably should probably become async instead?
There was a problem hiding this comment.
Yes, though TBH not sure how to handle async in ClusterSystem.shutdown.
|
Okey I think I have some work to be done here to make it nicer to implement! I'll follow up as soon as I'm out of compiler land! |
|
I'll check this out and see how to improve use of actor context and metadata now 👍 |
| // MARK: Cluster singleton boss | ||
|
|
||
| internal protocol ClusterSingletonProtocol { | ||
| internal protocol _ClusterSingletonBoss { |
There was a problem hiding this comment.
If internal no need to underscore it :)
The underscoring is to hide from docc docs, but internal types are not included in those docs anyway so we're good 👍
Love that we're running with the boss name, much more fun 👍
There was a problem hiding this comment.
Yeah I will rename it to ClusterSingletonBossProtocol. There was a name conflict. :)
| public protocol ClusterSingletonProtocol: DistributedActor where ActorSystem == ClusterSystem { | ||
| /// Must be implemented using a `@ActorID.Metadata(\.clusterSingletonID)` annotated property. | ||
| var singletonName: String { get } | ||
| } |
There was a problem hiding this comment.
yup right, we wont be using this after all
Sources/DistributedActors/Plugins/ClusterSingleton/ClusterSingleton.swift
Show resolved
Hide resolved
| private func updateTargetNode(node: UniqueNode?) async throws { | ||
| guard self.targetNode != node else { | ||
| self.log.debug("Skip updating target node. New node is already the same as current targetNode.", metadata: self.metadata()) | ||
| let metadata = await self.metadata() |
There was a problem hiding this comment.
oh wow this is bad, definitely not async logger metadata access. (Why it's bad: this will always render all the metadata, AND causes suspensions even if we're not even logging -- the logging must be done inside the debug(...) call because it is lazily evaluated (metadata is an auto closure)
It's because the move of the singleton in to the other actor -- if let singleton = await self.singleton {
but why did we do this in a new actor to begin with...? This actor should be able to do everything necessary?
There was a problem hiding this comment.
over-engineered it. wanted to group the logic but got carried away. amended 3b882d6.
|
Make this PR mergeable to prepare for #1001 |
| defer { self.memberTimerTasks.removeValue(forKey: member) } | ||
|
|
||
| try await Task.sleep(until: .now + delay, clock: .continuous) | ||
|
|
| await self.settings.plugins.stopAll(self) | ||
| pluginsSemaphore.signal() | ||
| } | ||
| pluginsSemaphore.wait() |
There was a problem hiding this comment.
another case of "we need send self settings.plugins.stopAll(self)" strikes again huh...
There was a problem hiding this comment.
Ah arguably we could make the shutdown async now 🤔
| /// determine the node that the singleton runs on. If the singleton falls on *this* node, `ClusterSingletonBoss` | ||
| /// will spawn the actual singleton actor. Otherwise, `ClusterSingletonBoss` will hand over the singleton | ||
| /// whenever the node changes. | ||
| internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>: ClusterSingletonBossProtocol where Act.ActorSystem == ClusterSystem { |
There was a problem hiding this comment.
(love the boss naming, makes me smile each time lol) Enough manager objects already! 😆
| } | ||
| } else { | ||
| // Run singleton on this node if clustering is not enabled | ||
| self.log.debug("Clustering not enabled. Taking over singleton.") |
| case .some(let node) where node == self.actorSystem.cluster.uniqueNode: | ||
| () | ||
| case .some(let node): | ||
| let singleton = try Act.resolve(id: .singleton(Act.self, settings: self.settings, remote: node), using: self.actorSystem) |
There was a problem hiding this comment.
Just FYI, That's one of the places I have new impl based on well-known metadata for :)
| } | ||
|
|
||
| nonisolated func stop() async { | ||
| Task { |
There was a problem hiding this comment.
Could do without this outer task I think, will try locally
There was a problem hiding this comment.
ah yes, because the method is async now.
| enum BufferError: Error { | ||
| case full | ||
| } | ||
| } |
There was a problem hiding this comment.
👍 Basically our new "async" stash replacement, looking good
| reply.shouldStartWith(prefix: "Hello Charlie!") | ||
|
|
||
| // singleton.ref (proxy-only) | ||
| let proxyRef = try await test.singleton.proxy(of: TheSingleton.self, name: TheSingleton.name) |
There was a problem hiding this comment.
Impl wise I actually realized that we don't handle this well, we might just do the host version for now.
ktoso
left a comment
There was a problem hiding this comment.
LGTM, looking good! I'll merge and rebase my changes onto this and get to finishing all the tests unlocking 👍
Resolves #824