Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,23 @@ let products: [PackageDescription.Product] = [
),
]

// This is a workaround since current published nightly docker images don't have the latest Swift availabilities yet
let platforms: [SupportedPlatform]?
#if os(Linux)
platforms = nil
#else
platforms = [
// we require the 'distributed actor' language and runtime feature:
.iOS(.v16),
.macOS(.v13),
.tvOS(.v16),
.watchOS(.v9),
]
#endif
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workaround for 5.7 docker images being pre-wwdc.


var package = Package(
name: "swift-distributed-actors",
platforms: [
// we require the 'distributed actor' language and runtime feature:
.iOS(.v16),
.macOS(.v13),
.tvOS(.v16),
.watchOS(.v9),
],
platforms: platforms,
products: products,

dependencies: dependencies,
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedActors/ActorLogging.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ internal final class LoggingContext {
extension Logger {
/// Create a logger specific to this actor.
public init<Act: DistributedActor>(actor: Act) where Act.ActorSystem == ClusterSystem {
var log = Logger(label: "\(actor.id)")
var log = actor.actorSystem.settings.logging.baseLogger
log[metadataKey: "actor/path"] = "\(actor.id.path)"
log[metadataKey: "actor/id"] = "\(actor.id)"
self = log
Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedActors/ActorRefProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ extension RemoteActorRefProvider {

return self.localProvider._resolve(context: context)
case .remote:
return self._resolveAsRemoteRef(context, remoteAddress: context.id)
return self._resolveAsRemoteRef(context, remoteAddress: context.id._asRemote)
}
}

public func _resolveUntyped(context: _ResolveContext<Never>) -> _AddressableActorRef {
if self.localNode == context.id.uniqueNode {
return self.localProvider._resolveUntyped(context: context)
} else {
return _AddressableActorRef(self._resolveAsRemoteRef(context, remoteAddress: context.id))
return _AddressableActorRef(self._resolveAsRemoteRef(context, remoteAddress: context.id._asRemote))
}
}

Expand Down
155 changes: 155 additions & 0 deletions Sources/DistributedActors/Cluster/DistributedNodeDeathWatcher.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Distributed
import Logging

/// Implements ``LifecycleWatch`` semantics in presence of ``Node`` failures.
///
/// Depends on a failure detector (e.g. SWIM) to actually detect a node failure, however once detected,
/// it handles notifying all _local_ actors which have watched at least one actor the terminating node.
///
/// ### Implementation
/// In order to avoid every actor having to subscribe to cluster events and individually handle the relationship between those
/// and individually watched actors, the watcher handles subscribing for cluster events on behalf of actors which watch
/// other actors on remote nodes, and messages them upon a node becoming down.
///
/// Actor which is notified automatically when a remote actor is `context.watch()`-ed.
///
/// Allows manually mocking membership changes to trigger terminated notifications.
internal actor DistributedNodeDeathWatcher {
// TODO(distributed): actually use this actor rather than the behavior

typealias ActorSystem = ClusterSystem

private let log: Logger

private let selfNode: UniqueNode
private var membership: Cluster.Membership = .empty

/// Members which have been `removed`
// TODO: clear after a few days, or some max count of nodes, use sorted set for this
private var nodeTombstones: Set<UniqueNode> = []

/// Mapping between remote node, and actors which have watched some actors on given remote node.
private var remoteWatchCallbacks: [UniqueNode: Set<WatcherAndCallback>] = [:]

private var eventListenerTask: Task<Void, Error>?

init(actorSystem: ActorSystem) async {
var log = actorSystem.log
self.log = log
self.selfNode = actorSystem.cluster.uniqueNode
// initialized

let events = actorSystem.cluster.events
self.eventListenerTask = Task {
for try await event in events {
switch event {
case .membershipChange(let change):
self.membershipChanged(change)
case .snapshot(let membership):
let diff = Cluster.Membership._diff(from: .empty, to: membership)
for change in diff.changes {
self.membershipChanged(change)
}
case .leadershipChange, .reachabilityChange:
break // ignore those, they don't affect downing
}
}
}
}

func watchActor(
on remoteNode: UniqueNode,
by watcher: ClusterSystem.ActorID,
whenTerminated nodeTerminatedFn: @escaping @Sendable (UniqueNode) async -> Void
) {
guard !self.nodeTombstones.contains(remoteNode) else {
// the system the watcher is attempting to watch has terminated before the watch has been processed,
// thus we have to immediately reply with a termination system message, as otherwise it would never receive one
Task {
await nodeTerminatedFn(remoteNode)
}
return
}

let record = WatcherAndCallback(watcherID: watcher, callback: nodeTerminatedFn)
self.remoteWatchCallbacks[remoteNode, default: []].insert(record)
}

func removeWatcher(id: ClusterSystem.ActorID) {
// TODO: this can be optimized a bit more I suppose, with a reverse lookup table
let removeMe = WatcherAndCallback(watcherID: id, callback: { _ in () })
for (node, var watcherAndCallbacks) in self.remoteWatchCallbacks {
if watcherAndCallbacks.remove(removeMe) != nil {
self.remoteWatchCallbacks[node] = watcherAndCallbacks
}
}
}

func cleanupTombstone(node: UniqueNode) {
_ = self.nodeTombstones.remove(node)
}

func membershipChanged(_ change: Cluster.MembershipChange) {
guard let change = self.membership.applyMembershipChange(change) else {
return // no change, nothing to act on
}

// TODO: make sure we only handle ONCE?
if change.status >= .down {
// can be: down, leaving or removal.
// on any of those we want to ensure we handle the "down"
self.handleAddressDown(change)
}
}

func handleAddressDown(_ change: Cluster.MembershipChange) {
let terminatedNode = change.node

if let watchers = self.remoteWatchCallbacks.removeValue(forKey: terminatedNode) {
for watcher in watchers {
Task {
await watcher.callback(terminatedNode)
}
}
}

// we need to keep a tombstone, so we can immediately reply with a terminated,
// in case another watch was just in progress of being made
self.nodeTombstones.insert(terminatedNode)
}

func cancel() {
self.eventListenerTask?.cancel()
self.eventListenerTask = nil
}
}

extension DistributedNodeDeathWatcher {
struct WatcherAndCallback: Hashable {
/// Address of the local watcher which had issued this watch
let watcherID: ClusterSystem.ActorID
let callback: @Sendable (UniqueNode) async -> Void

func hash(into hasher: inout Hasher) {
hasher.combine(self.watcherID)
}

static func == (lhs: WatcherAndCallback, rhs: WatcherAndCallback) -> Bool {
lhs.watcherID == rhs.watcherID
}
}
}
22 changes: 22 additions & 0 deletions Sources/DistributedActors/Cluster/NodeDeathWatcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,23 @@ internal final class NodeDeathWatcherInstance: NodeDeathWatcher {

func handleAddressDown(_ change: Cluster.MembershipChange) {
let terminatedNode = change.node

// ref
if let watchers = self.remoteWatchers.removeValue(forKey: terminatedNode) {
for ref in watchers {
// we notify each actor that was watching this remote address
ref._sendSystemMessage(.nodeTerminated(terminatedNode))
}
}

if let watchers = self.remoteWatchCallbacks.removeValue(forKey: terminatedNode) {
for watcher in watchers {
Task {
await watcher.callback(terminatedNode)
}
}
}

// we need to keep a tombstone, so we can immediately reply with a terminated,
// in case another watch was just in progress of being made
self.nodeTombstones.insert(terminatedNode)
Expand Down Expand Up @@ -186,8 +196,20 @@ enum NodeDeathWatcherShell {

context.system.cluster.events.subscribe(context.subReceive(Cluster.Event.self) { event in
switch event {
case .snapshot(let membership):
context.log.info("Membership snapshot: \(membership)")
let diff = Cluster.Membership._diff(from: .empty, to: membership)
for change in diff.changes {
instance.onMembershipChanged(change)
}

case .membershipChange(let change) where change.isAtLeast(.down):
context.log.info("Node down: \(change)!")
instance.handleAddressDown(change)
case .membershipChange(let change):
context.log.info("Node change: \(change)!")
instance.onMembershipChanged(change)

default:
() // ignore other changes, we only need to react on nodes becoming DOWN
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ extension OpLogDistributedReceptionist: LifecycleWatch {
if self.storage.addRegistration(sequenced: sequenced, key: key, guest: guest) {
// self.instrumentation.actorRegistered(key: key, id: id) // TODO(distributed): make the instrumentation calls compatible with distributed actor based types

watchTermination(of: guest) { onActorTerminated(identity: $0) }
watchTermination(of: guest)

self.log.debug(
"Registered [\(id)] for key [\(key)]",
Expand Down Expand Up @@ -562,9 +562,7 @@ extension OpLogDistributedReceptionist {
// We resolve a stub that we cannot really ever send messages to, but we can "watch" it
let resolved = try! actorSystem._resolveStub(identity: identity) // TODO(distributed): remove the throwing here?

watchTermination(of: resolved) {
onActorTerminated(identity: $0)
}
watchTermination(of: resolved)
if self.storage.addRegistration(sequenced: sequenced, key: key, guest: resolved) {
// self.instrumentation.actorRegistered(key: key, id: id) // TODO(distributed): make the instrumentation calls compatible with distributed actor based types
}
Expand Down Expand Up @@ -798,8 +796,7 @@ extension OpLogDistributedReceptionist {
// MARK: Termination handling

extension OpLogDistributedReceptionist {
// func onActorTerminated(terminated: Signals.Terminated) {
func onActorTerminated(identity id: ID) {
public distributed func terminated(actor id: ID) {
if id == ActorID._receptionist(on: id.uniqueNode, for: .distributedActors) {
self.log.debug("Watched receptionist terminated: \(id)")
self.receptionistTerminated(identity: id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public final class _RemoteClusterActorPersonality<Message: Codable> {
private var _cachedAssociation: ManagedAtomicLazyReference<Association>

init(shell: ClusterShell, id: ActorID, system: ClusterSystem) {
precondition(id._isRemote, "RemoteActorRef MUST be remote. ActorID was: \(String(reflecting: id))")
if !id._isRemote {
let _: Void = fatalErrorBacktrace("RemoteActorRef MUST be remote. ActorID was: \(id.detailedDescription)")
}

self._cachedAssociation = ManagedAtomicLazyReference()

Expand Down
5 changes: 4 additions & 1 deletion Sources/DistributedActors/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,14 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
/// Starts plugins after the system is fully initialized
await self.settings.plugins.startAll(self)

self.log.info("ClusterSystem [\(self.name)] initialized, listening on: \(self.settings.uniqueBindNode)")
if settings.enabled {
self.log.info("ClusterSystem [\(self.name)] initialized, listening on: \(self.settings.uniqueBindNode): \(self.cluster.ref)")

self.log.info("Setting in effect: .autoLeaderElection: \(self.settings.autoLeaderElection)")
self.log.info("Setting in effect: .downingStrategy: \(self.settings.downingStrategy)")
self.log.info("Setting in effect: .onDownAction: \(self.settings.onDownAction)")
} else {
self.log.info("ClusterSystem [\(self.name)] initialized; Cluster disabled, not listening for connections.")
}
}

Expand Down
24 changes: 15 additions & 9 deletions Sources/DistributedActors/DistributedActors.docc/Lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ distributed actor Romeo: LifecycleWatch {
}

distributed func watch(_ romeo: Romeo) {
watchTermination(of: romeo) { terminatedID in
probe.tell("Oh no! \(terminatedID) is dead!")
// TODO: Drink poison
}
watchTermination(of: romeo)
}

distributed func terminated(actor id: ActorID) async {
print("Oh no! \(id) is dead!")
// *Drinks poison*
}
}

Expand All @@ -37,20 +39,24 @@ distributed actor Juliet: LifecycleWatch {
}

distributed func watch(_ romeo: Romeo) {
watchTermination(of: romeo) { terminatedID in
probe.tell("Oh no! \(terminatedID) is dead!")
// TODO: Stab through heart
}
watchTermination(of: romeo)
}

distributed func terminated(actor id: ActorID) async {
print("Oh no! \(id) is dead!")
// *Stabs through heart*
}
}
```

The ``LifecycleWatch/watchTermination(of:whenTerminated:file:line:)`` API purposefully does not use async/await because that would cause `romeo` to be retained as this function suspends. Instead, we allow it to complete and once the romeo actor is determined terminated, we get called back with its ``ActorID``.
The ``LifecycleWatch/watchTermination(of:file:line:)`` API purposefully does not use async/await because that would cause `romeo` to be retained as this function suspends. Instead, we allow it, and the function calling it (which keeps a reference to `Romeo`), to complete and once the romeo actor is determined terminated, we get called back with its ``ActorID`` in the separate ``terminated(actor:file:line:)`` method.

This API offers the same semantics, regardless where the actors are located, and always triggers the termination closure as the watched actor is considered to have terminated.

In case the watched actor is _local_, it's termination is tied to Swift's ref-counting mechanisms, and an actor is terminated as soon as there are no more strong references to it in a system. It then is deinitialized, and the actor system's `resignID(actor.id)` is triggered, causing propagation to all the other actors which have been watching that actor.

You can also ``unwatchTermination(of:file:line:)``

In case the watched actor is _remote_, termination may happen because of two reasons:
- either its reference count _on the remote system_ dropped to zero and it followed the same deinitialization steps as just described in the local case;
- or, the entire node the distributed actor was located on has been declared ``Cluster/MemberStatus/down`` and therefore the actor is assumed terminated (regardless if it really has deinitialized or not).
Expand Down
Loading