@@ -52,14 +52,12 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>:
5252 /// The node that the singleton runs on
5353 private var targetNode : UniqueNode ?
5454
55- /// Keeps track of singleton allocation status
56- private let allocationTracker : AllocationTracker
55+ /// The concrete distributed actor instance (the "singleton") if this node is indeed hosting it,
56+ /// or nil otherwise - meaning that the singleton instance is actually located on another member.
57+ private var singleton : Act ?
5758
58- private var singleton : Act ? {
59- get async {
60- await self . allocationTracker. singleton
61- }
62- }
59+ private var allocationStatus : AllocationStatus = . pending
60+ private var allocationTimeoutTask : Task < Void , Error > ?
6361
6462 /// Remote call "buffer" in case `singleton` is `nil`
6563 private var remoteCallContinuations : [ ( CallID , CheckedContinuation < Act , Never > ) ] = [ ]
@@ -78,7 +76,6 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>:
7876 self . settings = settings
7977 self . allocationStrategy = settings. allocationStrategy. makeAllocationStrategy ( system. settings, settings)
8078 self . singletonFactory = singletonFactory
81- self . allocationTracker = await AllocationTracker ( timeout: settings. allocationTimeout)
8279
8380 if system. settings. enabled {
8481 self . clusterEventsSubscribeTask = Task {
@@ -110,8 +107,7 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>:
110107
111108 private func updateTargetNode( node: UniqueNode ? ) async throws {
112109 guard self . targetNode != node else {
113- let metadata = await self . metadata ( )
114- self . log. debug ( " Skip updating target node. New node is already the same as current targetNode. " , metadata: metadata)
110+ self . log. debug ( " Skip updating target node. New node is already the same as current targetNode. " , metadata: self . metadata ( ) )
115111 return
116112 }
117113
@@ -127,11 +123,11 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>:
127123 default :
128124 if previousTargetNode == selfNode {
129125 self . log. debug ( " Node \( selfNode) handing over singleton \( self . settings. name) " )
130- try await self . handOver ( to: node)
126+ self . handOver ( to: node)
131127 }
132128
133129 // Update `singleton` regardless
134- try await self . updateSingleton ( node: node)
130+ try self . updateSingleton ( node: node)
135131 }
136132 }
137133
@@ -140,48 +136,51 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>:
140136 preconditionFailure ( " Cluster singleton [ \( self . settings. name) ] cannot run on this node. Please review AllocationStrategySettings and/or cluster singleton usage. " )
141137 }
142138
143- let metadata = await self . metadata ( )
144- self . log. debug ( " Take over singleton [ \( self . settings. name) ] from [ \( String ( describing: from) ) ] " , metadata: metadata)
139+ self . log. debug ( " Take over singleton [ \( self . settings. name) ] from [ \( String ( describing: from) ) ] " , metadata: self . metadata ( ) )
145140
146141 // TODO: (optimization) tell `from` node that this node is taking over (https://github.com/apple/swift-distributed-actors/issues/329)
147142 let singleton = try await _Props. $forSpawn. withValue ( _Props. singleton ( settings: self . settings) ) {
148143 try await singletonFactory ( self . actorSystem)
149144 }
150- await self . updateSingleton ( singleton)
145+ self . updateSingleton ( singleton)
151146 }
152147
153- private func handOver( to: UniqueNode ? ) async throws {
154- let metadata = await self . metadata ( )
155- self . log. debug ( " Hand over singleton [ \( self . settings. name) ] to [ \( String ( describing: to) ) ] " , metadata: metadata)
148+ private func handOver( to: UniqueNode ? ) {
149+ self . log. debug ( " Hand over singleton [ \( self . settings. name) ] to [ \( String ( describing: to) ) ] " , metadata: self . metadata ( ) )
156150
157151 // TODO: (optimization) tell `to` node that this node is handing off (https://github.com/apple/swift-distributed-actors/issues/329)
158- await self . updateSingleton ( nil )
152+ self . updateSingleton ( nil )
159153 }
160154
161- private func updateSingleton( node: UniqueNode ? ) async throws {
155+ private func updateSingleton( node: UniqueNode ? ) throws {
162156 switch node {
163157 case . some( let node) where node == self . actorSystem. cluster. uniqueNode:
164158 ( )
165159 case . some( let node) :
166160 let singleton = try Act . resolve ( id: . singleton( Act . self, settings: self . settings, remote: node) , using: self . actorSystem)
167- await self . updateSingleton ( singleton)
161+ self . updateSingleton ( singleton)
168162 case . none:
169- await self . updateSingleton ( nil )
163+ self . updateSingleton ( nil )
170164 }
171165 }
172166
173- private func updateSingleton( _ newSingleton: Act ? ) async {
174- let currentSingleton = await self . singleton
175- self . log. debug ( " Update singleton from [ \( String ( describing: currentSingleton) ) ] to [ \( String ( describing: newSingleton) ) ], with \( self . remoteCallContinuations. count) remote calls pending " )
176- await self . allocationTracker. updateSingleton ( newSingleton)
167+ private func updateSingleton( _ newSingleton: Act ? ) {
168+ self . log. debug ( " Update singleton from [ \( String ( describing: self . singleton) ) ] to [ \( String ( describing: newSingleton) ) ], with \( self . remoteCallContinuations. count) remote calls pending " , metadata: self . metadata ( ) )
169+ self . singleton = newSingleton
177170
178171 // Unstash messages if we have the singleton
179172 guard let singleton = newSingleton else {
173+ self . allocationStatus = . pending
174+ self . startTimeoutTask ( )
180175 return
181176 }
182177
178+ self . allocationStatus = . allocated
179+ self . allocationTimeoutTask? . cancel ( )
180+ self . allocationTimeoutTask = nil
181+
183182 self . remoteCallContinuations. forEach { ( callID, continuation) in
184- self . log. debug ( " Flushing remote call [ \( callID) ] to [ \( singleton) ] " )
183+ self . log. debug ( " Flushing remote call [ \( callID) ] to [ \( singleton) ] " , metadata : self . metadata ( ) )
185184 continuation. resume ( returning: singleton)
186185 }
187186 }
@@ -196,8 +195,7 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>:
196195 Res: Codable
197196 {
198197 let singleton = try await self . findSingleton ( )
199- let metadata = await self . metadata ( )
200- self . log. trace ( " Forwarding invocation [ \( invocation) ] to [ \( singleton) ] " , metadata: metadata)
198+ self . log. trace ( " Forwarding invocation [ \( invocation) ] to [ \( singleton) ] " , metadata: self . metadata ( ) )
201199
202200 var invocation = invocation // FIXME: should be inout param
203201 return try await singleton. actorSystem. remoteCall (
@@ -215,8 +213,7 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>:
215213 throwing: Err . Type
216214 ) async throws where Err: Error {
217215 let singleton = try await self . findSingleton ( )
218- let metadata = await self . metadata ( )
219- self . log. trace ( " Forwarding invocation [ \( invocation) ] to [ \( singleton) ] " , metadata: metadata)
216+ self . log. trace ( " Forwarding invocation [ \( invocation) ] to [ \( singleton) ] " , metadata: self . metadata ( ) )
220217
221218 var invocation = invocation // FIXME: should be inout param
222219 return try await singleton. actorSystem. remoteCallVoid (
@@ -228,21 +225,20 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>:
228225 }
229226
230227 private func findSingleton( ) async throws -> Act {
231- let allocationStatus = await self . allocationTracker. status
232- guard allocationStatus != . timedOut else {
228+ guard self . allocationStatus != . timedOut else {
233229 throw ClusterSingletonError . allocationTimeout
234230 }
235231
236232 // If singleton is available, forward remote call to it right away.
237- if let singleton = await self . singleton {
233+ if let singleton = self . singleton {
238234 return singleton
239235 }
240236
241237 return await withCheckedContinuation { continuation in
242238 // Otherwise, we "stash" the remote call until singleton becomes available.
243239 Task {
244240 let callID = UUID ( )
245- self . log. debug ( " Stashing remote call [ \( callID) ] " )
241+ self . log. debug ( " Stashing remote call [ \( callID) ] " , metadata : self . metadata ( ) )
246242 self . remoteCallContinuations. append ( ( callID, continuation) )
247243 // FIXME: honor settings.bufferCapacity
248244 }
@@ -276,71 +272,29 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingletonProtocol>:
276272
277273 nonisolated func stop( ) {
278274 Task {
279- try await self . whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
275+ await self . whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
280276 // TODO: perhaps we can figure out where `to` is next and hand over gracefully?
281- try await __secretlyKnownToBeLocal. handOver ( to: nil )
277+ __secretlyKnownToBeLocal. handOver ( to: nil )
282278 }
283279 }
284280 }
285281
286- actor AllocationTracker {
287- /// The concrete distributed actor instance (the "singleton") if this node is indeed hosting it,
288- /// or nil otherwise - meaning that the singleton instance is actually located on another member.
289- var singleton : Act ? {
290- didSet {
291- switch self . singleton {
292- case . some:
293- self . status = . allocated
294- self . timeoutTask? . cancel ( )
295- self . timeoutTask = nil
296- case . none:
297- self . status = . pending
298- if self . timeoutTask == nil {
299- Task {
300- await self . startTimeoutTask ( )
301- }
302- }
303- }
304- }
305- }
306-
307- var status : Status = . pending
308-
309- var timeoutTask : Task < Void , Error > ?
310- let timeout : Duration
311-
312- init ( timeout: Duration ) async {
313- self . timeout = timeout
314- self . singleton = nil
315- self . status = . pending
316- await self . startTimeoutTask ( )
317- }
318-
319- func updateSingleton( _ singleton: Act ? ) async {
320- self . singleton = singleton
321- }
282+ private func startTimeoutTask( ) {
283+ self . allocationTimeoutTask = Task {
284+ try await Task . sleep ( until: . now + self . settings. allocationTimeout, clock: . continuous)
322285
323- private func startTimeoutTask( ) async {
324- self . timeoutTask = Task {
325- try await Task . sleep ( until: . now + self . timeout, clock: . continuous)
326-
327- guard !Task. isCancelled else {
328- return
329- }
330-
331- self . onTimeout ( )
286+ guard !Task. isCancelled else {
287+ return
332288 }
333- }
334289
335- private func onTimeout( ) {
336- self . status = . timedOut
290+ self . allocationStatus = . timedOut
337291 }
292+ }
338293
339- enum Status {
340- case allocated
341- case pending
342- case timedOut
343- }
294+ enum AllocationStatus {
295+ case allocated
296+ case pending
297+ case timedOut
344298 }
345299}
346300
@@ -352,15 +306,15 @@ enum ClusterSingletonError: Error, Codable {
352306// MARK: Logging
353307
354308extension ClusterSingletonBoss {
355- func metadata( ) async -> Logger . Metadata {
309+ func metadata( ) -> Logger . Metadata {
356310 var metadata : Logger . Metadata = [
357311 " tag " : " singleton " ,
358312 " singleton/name " : " \( self . settings. name) " ,
359313 " singleton/buffer " : " \( self . remoteCallContinuations. count) / \( self . settings. bufferCapacity) " ,
360314 ]
361315
362316 metadata [ " targetNode " ] = " \( String ( describing: self . targetNode? . debugDescription) ) "
363- if let singleton = await self . singleton {
317+ if let singleton = self . singleton {
364318 metadata [ " singleton " ] = " \( singleton. id) "
365319 }
366320
0 commit comments