Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Sources/DistributedActors/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -876,13 +876,12 @@ extension ClusterSystem {
return nil
}

return self.namingLock.withLock {
return try self.namingLock.withLock {
guard let managed = self._managedDistributedActors.get(identifiedBy: id) else {
Copy link
Member Author

@yim-lee yim-lee Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, only local resolve would reach here? In other words, it's not part of remote call code path?

log.trace("Resolved as remote reference", metadata: [
"actor/id": "\(id)",
])
// TODO(distributed): throw here, this should be a dead letter
return nil
throw DeadLetterError(recipient: id)
}

if let resolved = managed as? Act {
Expand Down Expand Up @@ -1130,11 +1129,6 @@ extension ClusterSystem {
return
}

guard let actor = self.resolve(id: recipient) else {
self.log.error("Unable to resolve recipient \(recipient). Message will be dropped: \(invocation)")
return
}

Task {
var decoder = ClusterInvocationDecoder(system: self, message: invocation)

Expand All @@ -1148,6 +1142,12 @@ extension ClusterSystem {
)

do {
guard let actor = self.resolve(id: recipient) else {
self.log.error("Unable to resolve recipient \(recipient). Message will be dropped: \(invocation)")
self.deadLetters.tell(DeadLetter(invocation, recipient: recipient))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the dead letter happen at sender, after getting back DeadLetterError? Or both sender and recipient? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the recipient I believe, since the recipient will has no function that will throw, so this dead letter informs that system that "someone is calling things on you, but they can't get through to the recipient actor".

And the caller has the function that they called that resulted in a dead letter, that function should throw as it received s the error reply from the dead-letter-ed remoteCall. This way the caller knows it happened, we don't need to log it there.

throw DeadLetterError(recipient: recipient)
}

try await executeDistributedTarget(
on: actor,
target: target,
Expand Down
9 changes: 9 additions & 0 deletions Sources/DistributedActors/DeadLetters.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
//
//===----------------------------------------------------------------------===//

import Distributed
import Foundation
import Logging

// ==== ----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -255,3 +257,10 @@ extension ActorPath {
static let _dead: ActorPath = try! ActorPath(root: "dead")
static let _deadLetters: ActorPath = try! ActorPath._dead.appending("letters")
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Errors

public struct DeadLetterError: DistributedActorSystemError, Codable {
public let recipient: ClusterSystem.ActorID
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

33 changes: 16 additions & 17 deletions Tests/DistributedActorsTests/ClusterSystemTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
local.cluster.join(node: remote.cluster.uniqueNode)

let greeter = Greeter(actorSystem: local)
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
let localGreeter = try Greeter.resolve(id: greeter.id, using: remote)

let value = try await shouldNotThrow {
try await remoteGreeter.hello()
try await localGreeter.hello()
}
value.shouldEqual("hello")
}
Expand All @@ -233,10 +233,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
local.cluster.join(node: remote.cluster.uniqueNode)

let greeter = Greeter(actorSystem: local)
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
let localGreeter = try Greeter.resolve(id: greeter.id, using: remote)

let error = try await shouldThrow {
_ = try await remoteGreeter.helloThrow(codable: true)
_ = try await localGreeter.helloThrow(codable: true)
}
guard error is GreeterCodableError else {
throw testKit.fail("Expected GreeterCodableError, got \(error)")
Expand All @@ -255,10 +255,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
local.cluster.join(node: remote.cluster.uniqueNode)

let greeter = Greeter(actorSystem: local)
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
let localGreeter = try Greeter.resolve(id: greeter.id, using: remote)

let error = try await shouldThrow {
_ = try await remoteGreeter.helloThrow(codable: false)
_ = try await localGreeter.helloThrow(codable: false)
}
guard let remoteCallError = error as? GenericRemoteCallError else {
throw testKit.fail("Expected GenericRemoteCallError, got \(error)")
Expand All @@ -277,10 +277,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
local.cluster.join(node: remote.cluster.uniqueNode)

let greeter = Greeter(actorSystem: local)
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
let localGreeter = try Greeter.resolve(id: greeter.id, using: remote)

try await shouldNotThrow {
try await remoteGreeter.muted()
try await localGreeter.muted()
}
}

Expand All @@ -295,10 +295,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
local.cluster.join(node: remote.cluster.uniqueNode)

let greeter = Greeter(actorSystem: local)
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
let localGreeter = try Greeter.resolve(id: greeter.id, using: remote)

let error = try await shouldThrow {
try await remoteGreeter.mutedThrow(codable: true)
try await localGreeter.mutedThrow(codable: true)
}
guard error is GreeterCodableError else {
throw testKit.fail("Expected GreeterCodableError, got \(error)")
Expand All @@ -316,10 +316,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
local.cluster.join(node: remote.cluster.uniqueNode)

let greeter = Greeter(actorSystem: local)
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
let localGreeter = try Greeter.resolve(id: greeter.id, using: remote)

let error = try await shouldThrow {
try await remoteGreeter.mutedThrow(codable: false)
try await localGreeter.mutedThrow(codable: false)
}
guard let remoteCallError = error as? GenericRemoteCallError else {
throw testKit.fail("Expected GenericRemoteCallError, got \(error)")
Expand All @@ -337,11 +337,11 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
local.cluster.join(node: remote.cluster.uniqueNode)

let greeter = Greeter(actorSystem: local)
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
let localGreeter = try Greeter.resolve(id: greeter.id, using: remote)

let error = try await shouldThrow {
try await RemoteCall.with(timeout: .milliseconds(200)) {
_ = try await remoteGreeter.hello(delayNanos: 3_000_000_000)
_ = try await localGreeter.hello(delayNanos: 3_000_000_000)
}
}

Expand All @@ -363,11 +363,11 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
local.cluster.join(node: remote.cluster.uniqueNode)

let greeter = Greeter(actorSystem: local)
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
let localGreeter = try Greeter.resolve(id: greeter.id, using: remote)

let error = try await shouldThrow {
try await RemoteCall.with(timeout: .milliseconds(200)) {
try await remoteGreeter.muted(delayNanos: 3_000_000_000)
try await localGreeter.muted(delayNanos: 3_000_000_000)
}
}

Expand All @@ -381,7 +381,6 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
}

private distributed actor Greeter {
typealias ID = ClusterSystem.ActorID
typealias ActorSystem = ClusterSystem

distributed func hello() async throws -> String {
Expand Down
79 changes: 79 additions & 0 deletions Tests/DistributedActorsTests/DeadLetterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//

import Distributed
@testable import DistributedActors
import DistributedActorsTestKit
@testable import Logging
Expand Down Expand Up @@ -80,4 +81,82 @@ final class DeadLetterTests: ClusterSystemXCTestCase {
try self.logCapture.awaitLogContaining(self.testKit, text: "This is a question")
try self.logCapture.awaitLogContaining(self.testKit, text: "/user/ludwig")
}

func test_remoteCallTerminatedTarget_shouldResultInDeadLetter() async throws {
let local = await setUpNode("local") { settings in
settings.enabled = true
}
let remote = await setUpNode("remote") { settings in
settings.enabled = true
}
local.cluster.join(node: remote.cluster.uniqueNode)

var greeter: Greeter? = Greeter(actorSystem: local)
let localGreeter = try Greeter.resolve(id: greeter!.id, using: remote)

let p = self.testKit.makeTestProbe(expecting: String.self)
let watcher = GreeterWatcher(probe: p, actorSystem: local)
try await watcher.watch(greeter!)

greeter = nil
try p.expectMessage(prefix: "Received terminated: /user/Greeter")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh we're missing API to p.watch a DA from a TestProbe it seems? If you want that'd be a great thing to implement...

We'd want to be able to p.watch(watcher); p.expectTerminated()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Created #990

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


let error = try await shouldThrow {
_ = try await localGreeter.greet(name: "world")
}

guard error is DeadLetterError else {
throw self.testKit.fail("Expected DeadLetterError, got \(error)")
}

try self.capturedLogs(of: local).awaitLogContaining(self.testKit, text: "was not delivered to")
}

func test_resolveTerminatedTarget_shouldResultInDeadLetter() async throws {
var greeter: Greeter? = Greeter(actorSystem: self.system)
let greeterID = greeter!.id

let p = self.testKit.makeTestProbe(expecting: String.self)
let watcher = GreeterWatcher(probe: p, actorSystem: self.system)
try await watcher.watch(greeter!)

greeter = nil
try p.expectMessage(prefix: "Received terminated: /user/Greeter")

let error = try shouldThrow {
_ = try self.system.resolve(id: greeterID, as: Greeter.self)
}

guard error is DeadLetterError else {
throw self.testKit.fail("Expected DeadLetterError, got \(error)")
}
}
}

private distributed actor Greeter {
typealias ActorSystem = ClusterSystem

distributed func greet(name: String) -> String {
"hello \(name)!"
}
}

private distributed actor GreeterWatcher: LifecycleWatch {
typealias ActorSystem = ClusterSystem

let probe: ActorTestProbe<String>

init(probe: ActorTestProbe<String>, actorSystem: ActorSystem) {
self.actorSystem = actorSystem
self.probe = probe
}

distributed func watch(_ greeter: Greeter) {
watchTermination(of: greeter)
}

// FIXME(distributed): Should not need to be distributed: https://github.com/apple/swift/pull/59397
public distributed func terminated(actor id: ActorID) async { // not REALLY distributed...
self.probe.tell("Received terminated: \(id)")
}
}