Skip to content

Commit 364e3dd

Browse files
yim-leektoso
andauthored
Dead letter when remote call target is dead (#989)
Resolves #943 * Remove logging Co-authored-by: Konrad `ktoso` Malawski <konrad.malawski@project13.pl> * Remove unneeded import Co-authored-by: Konrad `ktoso` Malawski <konrad.malawski@project13.pl> * Rename variable Co-authored-by: Konrad `ktoso` Malawski <konrad.malawski@project13.pl>
1 parent 028e87b commit 364e3dd

File tree

4 files changed

+110
-25
lines changed

4 files changed

+110
-25
lines changed

Sources/DistributedActors/ClusterSystem.swift

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -868,13 +868,12 @@ extension ClusterSystem {
868868
return nil
869869
}
870870

871-
return self.namingLock.withLock {
871+
return try self.namingLock.withLock {
872872
guard let managed = self._managedDistributedActors.get(identifiedBy: id) else {
873873
log.trace("Resolved as remote reference", metadata: [
874874
"actor/id": "\(id)",
875875
])
876-
// TODO(distributed): throw here, this should be a dead letter
877-
return nil
876+
throw DeadLetterError(recipient: id)
878877
}
879878

880879
if let resolved = managed as? Act {
@@ -1116,11 +1115,6 @@ extension ClusterSystem {
11161115
return
11171116
}
11181117

1119-
guard let actor = self.resolve(id: recipient) else {
1120-
self.log.error("Unable to resolve recipient \(recipient). Message will be dropped: \(invocation)")
1121-
return
1122-
}
1123-
11241118
Task {
11251119
var decoder = ClusterInvocationDecoder(system: self, message: invocation)
11261120

@@ -1134,6 +1128,11 @@ extension ClusterSystem {
11341128
)
11351129

11361130
do {
1131+
guard let actor = self.resolve(id: recipient) else {
1132+
self.deadLetters.tell(DeadLetter(invocation, recipient: recipient))
1133+
throw DeadLetterError(recipient: recipient)
1134+
}
1135+
11371136
try await executeDistributedTarget(
11381137
on: actor,
11391138
target: target,

Sources/DistributedActors/DeadLetters.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Distributed
1516
import Logging
1617

1718
// ==== ----------------------------------------------------------------------------------------------------------------
@@ -255,3 +256,10 @@ extension ActorPath {
255256
static let _dead: ActorPath = try! ActorPath(root: "dead")
256257
static let _deadLetters: ActorPath = try! ActorPath._dead.appending("letters")
257258
}
259+
260+
// ==== ----------------------------------------------------------------------------------------------------------------
261+
// MARK: Errors
262+
263+
public struct DeadLetterError: DistributedActorSystemError, Codable {
264+
public let recipient: ClusterSystem.ActorID
265+
}

Tests/DistributedActorsTests/ClusterSystemTests.swift

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
213213
local.cluster.join(node: remote.cluster.uniqueNode)
214214

215215
let greeter = Greeter(actorSystem: local)
216-
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
216+
let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote)
217217

218218
let value = try await shouldNotThrow {
219-
try await remoteGreeter.hello()
219+
try await remoteGreeterRef.hello()
220220
}
221221
value.shouldEqual("hello")
222222
}
@@ -233,10 +233,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
233233
local.cluster.join(node: remote.cluster.uniqueNode)
234234

235235
let greeter = Greeter(actorSystem: local)
236-
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
236+
let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote)
237237

238238
let error = try await shouldThrow {
239-
_ = try await remoteGreeter.helloThrow(codable: true)
239+
_ = try await remoteGreeterRef.helloThrow(codable: true)
240240
}
241241
guard error is GreeterCodableError else {
242242
throw testKit.fail("Expected GreeterCodableError, got \(error)")
@@ -255,10 +255,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
255255
local.cluster.join(node: remote.cluster.uniqueNode)
256256

257257
let greeter = Greeter(actorSystem: local)
258-
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
258+
let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote)
259259

260260
let error = try await shouldThrow {
261-
_ = try await remoteGreeter.helloThrow(codable: false)
261+
_ = try await remoteGreeterRef.helloThrow(codable: false)
262262
}
263263
guard let remoteCallError = error as? GenericRemoteCallError else {
264264
throw testKit.fail("Expected GenericRemoteCallError, got \(error)")
@@ -277,10 +277,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
277277
local.cluster.join(node: remote.cluster.uniqueNode)
278278

279279
let greeter = Greeter(actorSystem: local)
280-
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
280+
let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote)
281281

282282
try await shouldNotThrow {
283-
try await remoteGreeter.muted()
283+
try await remoteGreeterRef.muted()
284284
}
285285
}
286286

@@ -295,10 +295,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
295295
local.cluster.join(node: remote.cluster.uniqueNode)
296296

297297
let greeter = Greeter(actorSystem: local)
298-
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
298+
let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote)
299299

300300
let error = try await shouldThrow {
301-
try await remoteGreeter.mutedThrow(codable: true)
301+
try await remoteGreeterRef.mutedThrow(codable: true)
302302
}
303303
guard error is GreeterCodableError else {
304304
throw testKit.fail("Expected GreeterCodableError, got \(error)")
@@ -316,10 +316,10 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
316316
local.cluster.join(node: remote.cluster.uniqueNode)
317317

318318
let greeter = Greeter(actorSystem: local)
319-
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
319+
let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote)
320320

321321
let error = try await shouldThrow {
322-
try await remoteGreeter.mutedThrow(codable: false)
322+
try await remoteGreeterRef.mutedThrow(codable: false)
323323
}
324324
guard let remoteCallError = error as? GenericRemoteCallError else {
325325
throw testKit.fail("Expected GenericRemoteCallError, got \(error)")
@@ -337,11 +337,11 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
337337
local.cluster.join(node: remote.cluster.uniqueNode)
338338

339339
let greeter = Greeter(actorSystem: local)
340-
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
340+
let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote)
341341

342342
let error = try await shouldThrow {
343343
try await RemoteCall.with(timeout: .milliseconds(200)) {
344-
_ = try await remoteGreeter.hello(delayNanos: 3_000_000_000)
344+
_ = try await remoteGreeterRef.hello(delayNanos: 3_000_000_000)
345345
}
346346
}
347347

@@ -363,11 +363,11 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
363363
local.cluster.join(node: remote.cluster.uniqueNode)
364364

365365
let greeter = Greeter(actorSystem: local)
366-
let remoteGreeter = try Greeter.resolve(id: greeter.id, using: remote)
366+
let remoteGreeterRef = try Greeter.resolve(id: greeter.id, using: remote)
367367

368368
let error = try await shouldThrow {
369369
try await RemoteCall.with(timeout: .milliseconds(200)) {
370-
try await remoteGreeter.muted(delayNanos: 3_000_000_000)
370+
try await remoteGreeterRef.muted(delayNanos: 3_000_000_000)
371371
}
372372
}
373373

@@ -381,7 +381,6 @@ final class ClusterSystemTests: ClusterSystemXCTestCase {
381381
}
382382

383383
private distributed actor Greeter {
384-
typealias ID = ClusterSystem.ActorID
385384
typealias ActorSystem = ClusterSystem
386385

387386
distributed func hello() async throws -> String {

Tests/DistributedActorsTests/DeadLetterTests.swift

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Distributed
1516
@testable import DistributedActors
1617
import DistributedActorsTestKit
1718
@testable import Logging
@@ -80,4 +81,82 @@ final class DeadLetterTests: ClusterSystemXCTestCase {
8081
try self.logCapture.awaitLogContaining(self.testKit, text: "This is a question")
8182
try self.logCapture.awaitLogContaining(self.testKit, text: "/user/ludwig")
8283
}
84+
85+
func test_remoteCallTerminatedTarget_shouldResultInDeadLetter() async throws {
86+
let local = await setUpNode("local") { settings in
87+
settings.enabled = true
88+
}
89+
let remote = await setUpNode("remote") { settings in
90+
settings.enabled = true
91+
}
92+
local.cluster.join(node: remote.cluster.uniqueNode)
93+
94+
var greeter: Greeter? = Greeter(actorSystem: local)
95+
let remoteGreeterRef = try Greeter.resolve(id: greeter!.id, using: remote)
96+
97+
let p = self.testKit.makeTestProbe(expecting: String.self)
98+
let watcher = GreeterWatcher(probe: p, actorSystem: local)
99+
try await watcher.watch(greeter!)
100+
101+
greeter = nil
102+
try p.expectMessage(prefix: "Received terminated: /user/Greeter")
103+
104+
let error = try await shouldThrow {
105+
_ = try await remoteGreeterRef.greet(name: "world")
106+
}
107+
108+
guard error is DeadLetterError else {
109+
throw self.testKit.fail("Expected DeadLetterError, got \(error)")
110+
}
111+
112+
try self.capturedLogs(of: local).awaitLogContaining(self.testKit, text: "was not delivered to")
113+
}
114+
115+
func test_resolveTerminatedTarget_shouldResultInDeadLetter() async throws {
116+
var greeter: Greeter? = Greeter(actorSystem: self.system)
117+
let greeterID = greeter!.id
118+
119+
let p = self.testKit.makeTestProbe(expecting: String.self)
120+
let watcher = GreeterWatcher(probe: p, actorSystem: self.system)
121+
try await watcher.watch(greeter!)
122+
123+
greeter = nil
124+
try p.expectMessage(prefix: "Received terminated: /user/Greeter")
125+
126+
let error = try shouldThrow {
127+
_ = try self.system.resolve(id: greeterID, as: Greeter.self)
128+
}
129+
130+
guard error is DeadLetterError else {
131+
throw self.testKit.fail("Expected DeadLetterError, got \(error)")
132+
}
133+
}
134+
}
135+
136+
private distributed actor Greeter {
137+
typealias ActorSystem = ClusterSystem
138+
139+
distributed func greet(name: String) -> String {
140+
"hello \(name)!"
141+
}
142+
}
143+
144+
private distributed actor GreeterWatcher: LifecycleWatch {
145+
typealias ActorSystem = ClusterSystem
146+
147+
let probe: ActorTestProbe<String>
148+
149+
init(probe: ActorTestProbe<String>, actorSystem: ActorSystem) {
150+
self.actorSystem = actorSystem
151+
self.probe = probe
152+
}
153+
154+
distributed func watch(_ greeter: Greeter) {
155+
watchTermination(of: greeter)
156+
}
157+
158+
// FIXME(distributed): Should not need to be distributed: https://github.com/apple/swift/pull/59397
159+
public distributed func terminated(actor id: ActorID) async { // not REALLY distributed...
160+
self.probe.tell("Received terminated: \(id)")
161+
}
83162
}

0 commit comments

Comments
 (0)