@@ -5,6 +5,7 @@ import FoundationEssentials
55import Foundation
66#endif
77import Logging
8+ import ServiceLifecycle
89
910/**
1011 When we update Penny, AWS waits a few minutes before taking down the old Penny instance to
@@ -25,97 +26,138 @@ import Logging
2526 * If the old instance is too slow to make the process happen, the process is aborted and
2627 the new instance will start handling events without waiting more for the old instance.
2728 */
28- actor BotStateManager {
29-
29+ actor BotStateManager : Service {
3030 let id = Int ( Date ( ) . timeIntervalSince1970)
31- let services : HandlerContext . Services
31+ let context : HandlerContext
3232 let disableDuration : Duration
3333 let logger : Logger
34+ private var cachesPopulationContinuations : [ CheckedContinuation < Void , Never > ] = [ ]
3435
3536 var canRespond = false
36- var onStarted : ( ( ) async -> Void ) ?
3737
3838 init (
39- services : HandlerContext . Services ,
39+ context : HandlerContext ,
4040 disabledDuration: Duration = . seconds( 3 * 60 )
4141 ) {
42- self . services = services
42+ self . context = context
4343 self . disableDuration = disabledDuration
4444 var logger = Logger ( label: " BotStateManager " )
4545 logger [ metadataKey: " id " ] = " \( self . id) "
4646 self . logger = logger
4747 }
4848
49- func start( onStarted: @Sendable @escaping ( ) async -> Void ) async {
50- self . onStarted = onStarted
51- Task { await send ( . shutdown) }
52- cancelIfCachePopulationTakesTooLong ( )
49+ func run( ) async {
50+ switch Constants . deploymentEnvironment {
51+ case . local:
52+ break
53+ case . testing, . prod:
54+ self . context. backgroundProcessor. process {
55+ await self . cancelIfCachePopulationTakesTooLong ( )
56+ }
57+ self . context. backgroundProcessor. process {
58+ await self . send ( . shutdown)
59+ }
60+ }
61+
62+ /// Wait indefinitely
63+ let ( stream, _) = AsyncStream . makeStream ( of: Void . self)
64+ await stream. first ( where: { _ in true } )
5365 }
5466
55- private func cancelIfCachePopulationTakesTooLong( ) {
56- Task {
57- try await Task . sleep ( for: . seconds( 120 ) )
58- if !canRespond {
59- await startAllowingResponses ( )
60- logger. error ( " No CachesStorage-population was done in-time " )
61- }
67+ func addCachesPopulationContinuation( _ cont: CheckedContinuation < Void , Never > ) {
68+ switch Constants . deploymentEnvironment {
69+ case . local: break
70+ case . testing, . prod:
71+ cont. resume ( )
72+ return
73+ }
74+ switch self . canRespond {
75+ case true :
76+ cont. resume ( )
77+ case false :
78+ self . cachesPopulationContinuations. append ( cont)
79+ }
80+ }
81+
82+ private func cancelIfCachePopulationTakesTooLong( ) async {
83+ guard ( try ? await Task . sleep ( for: . seconds( 120 ) ) ) != nil else {
84+ return /// Somewhere else cancelled the Task
85+ }
86+ if !canRespond {
87+ await startAllowingResponses ( )
88+ logger. error ( " No CachesStorage-population was done in-time " )
6289 }
6390 }
6491
65- func canRespond( to event: Gateway . Event ) -> Bool {
66- checkIfItsASignal ( event: event)
92+ func canRespond( to event: Gateway . Event ) async -> Bool {
93+ switch Constants . deploymentEnvironment {
94+ case . local: break
95+ case . testing, . prod:
96+ await checkIfItsASignal ( event: event)
97+ }
6798 return canRespond
6899 }
69100
70- private func checkIfItsASignal( event: Gateway . Event ) {
101+ private func checkIfItsASignal( event: Gateway . Event ) async {
71102 guard case let . messageCreate( message) = event. data,
72103 message. channel_id == Constants . Channels. botLogs. id,
73104 let author = message. author,
74105 author. id == Constants . botId,
75- let otherId = message. content. split ( whereSeparator: \. isWhitespace) . last
106+ let otherId = message. content. split ( whereSeparator: \. isWhitespace) . last,
107+ otherId != " \( self . id) "
76108 else { return }
77- if otherId == " \( self . id) " { return }
78109
79110 if StateManagerSignal . shutdown. isInMessage ( message. content) {
80111 logger. trace ( " Received 'shutdown' signal " )
81- shutdown ( )
112+ self . context. backgroundProcessor. process {
113+ await self . shutdown ( )
114+ }
82115 } else if StateManagerSignal . didShutdown. isInMessage ( message. content) {
83116 logger. trace ( " Received 'didShutdown' signal " )
84- populateCache ( )
117+ self . context. backgroundProcessor. process {
118+ await self . populateCache ( )
119+ }
85120 }
86121 }
87122
88- private func shutdown( ) {
89- Task {
90- await services. cachesService. gatherCachedInfoAndSaveToRepository ( )
91- await send ( . didShutdown)
92- self . canRespond = false
123+ private func shutdown( ) async {
124+ await context. cachesService. gatherCachedInfoAndSaveToRepository ( )
125+ await send ( . didShutdown)
126+ self . canRespond = false
93127
94- try await Task . sleep ( for: disableDuration)
95- await startAllowingResponses ( )
96- logger. critical ( " AWS has not yet shutdown this instance of Penny! Why?! " )
128+ guard ( try ? await Task . sleep ( for: disableDuration) ) != nil else {
129+ return /// Somewhere else cancelled the Task
97130 }
131+
132+ await startAllowingResponses ( )
133+ logger. critical ( " AWS has not yet shutdown this instance of Penny! Why?! " )
98134 }
99135
100- private func populateCache( ) {
101- Task {
102- if canRespond {
103- logger. warning ( " Received a did-shutdown signal but Cache is already populated " )
104- } else {
105- await services. cachesService. getCachedInfoFromRepositoryAndPopulateServices ( )
106- await startAllowingResponses ( )
107- }
136+ private func populateCache( ) async {
137+ if canRespond {
138+ logger. warning ( " Received a did-shutdown signal but Cache is already populated " )
139+ } else {
140+ await context. cachesService. getCachedInfoFromRepositoryAndPopulateServices ( )
141+ await startAllowingResponses ( )
108142 }
109143 }
110144
111145 private func startAllowingResponses( ) async {
112- canRespond = true
113- await onStarted ? ( )
146+ self . canRespond = true
147+ for continuation in self . cachesPopulationContinuations {
148+ continuation. resume ( )
149+ }
150+ self . cachesPopulationContinuations. removeAll ( )
114151 }
115152
116153 private func send( _ signal: StateManagerSignal ) async {
154+ switch Constants . deploymentEnvironment {
155+ case . local: return
156+ case . testing, . prod: break
157+ }
158+
117159 let content = makeSignalMessage ( text: signal. rawValue, id: self . id)
118- await services . discordService. sendMessage (
160+ await context . discordService. sendMessage (
119161 channelId: Constants . Channels. botLogs. id,
120162 payload: . init( content: content)
121163 )
0 commit comments