Skip to content

Commit 60c74fb

Browse files
committed
fix: improve how loss-of-connections are handled
1 parent c03059f commit 60c74fb

5 files changed

Lines changed: 61 additions & 23 deletions

File tree

apps/appcontainer-node/packages/generic/src/appContainer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ export class AppContainer {
108108
}
109109

110110
await this.workforceAPI.init(this.id, this.workForceConnectionOptions, this)
111+
if (!this.workforceAPI.connected) throw new Error('AppContainer: Workforce not connected')
111112

112113
this.logger.info(`AppContainer: Connected to Workforce"`)
113114

shared/packages/api/src/adapterClient.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { EventEmitter } from 'events'
12
import { LoggerInstance } from './logger'
23
import { WebsocketClient } from './websocketClient'
34
import { Hook, MessageBase, MessageIdentifyClient } from './websocketConnection'
@@ -7,19 +8,23 @@ import { Hook, MessageBase, MessageIdentifyClient } from './websocketConnection'
78
* or (in the case where they run in the same process) hook directly into the AdapterServer, to call the methods directly.
89
* @see {@link ./adapterServer.ts}
910
*/
10-
export abstract class AdapterClient<ME, OTHER> {
11+
export abstract class AdapterClient<ME, OTHER> extends EventEmitter {
1112
/** Used for internal connections */
1213
private serverHook?: Hook<OTHER, ME>
1314

1415
protected _sendMessage: (type: keyof OTHER, ...args: any[]) => Promise<any> = () => {
1516
throw new Error('.init() must be called first!')
1617
}
1718

18-
constructor(protected logger: LoggerInstance, private clientType: MessageIdentifyClient['clientType']) {}
19+
constructor(protected logger: LoggerInstance, private clientType: MessageIdentifyClient['clientType']) {
20+
super()
21+
}
1922

2023
private conn?: WebsocketClient
2124
private terminated = false
2225

26+
private _connected = false
27+
2328
async init(id: string, connectionOptions: ClientConnectionOptions, clientMethods: ME): Promise<void> {
2429
if (connectionOptions.type === 'websocket') {
2530
const conn = new WebsocketClient(
@@ -40,10 +45,14 @@ export abstract class AdapterClient<ME, OTHER> {
4045
this.conn = conn
4146

4247
conn.on('connected', () => {
43-
this.logger.debug('Websocket client connected')
48+
this.logger.debug(`Websocket client connected ("${id}", ${this.clientType})`)
49+
this.emit('connected')
50+
this._connected = true
4451
})
4552
conn.on('disconnected', () => {
46-
this.logger.debug('Websocket client disconnected')
53+
this.logger.debug(`Websocket client disconnected ("${id}", ${this.clientType})`)
54+
this.emit('disconnected')
55+
this._connected = false
4756
})
4857
this._sendMessage = ((type: string, ...args: any[]) => conn.send(type, ...args)) as any
4958

@@ -74,6 +83,9 @@ export abstract class AdapterClient<ME, OTHER> {
7483
this.conn?.close()
7584
delete this.serverHook
7685
}
86+
get connected(): boolean {
87+
return this._connected
88+
}
7789
}
7890
/** Options for an AdepterClient */
7991
export type ClientConnectionOptions =

shared/packages/api/src/websocketClient.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ export class WebsocketClient extends WebsocketConnection {
5454

5555
resolve()
5656
})
57-
setTimeout(reject, 3000) // connection timeout
57+
setTimeout(() => {
58+
reject('Connection timeout')
59+
}, 3000) // connection timeout
5860
})
5961
}
6062
close(): void {

shared/packages/expectationManager/src/expectationManager.ts

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,14 @@ export class ExpectationManager {
135135
type: 'websocket',
136136
clientConnection: client,
137137
})
138-
this.workerAgents[client.clientId] = { api }
138+
this.workerAgents[client.clientId] = { api, connected: true }
139139
client.on('close', () => {
140+
this.logger.debug(`ExpectationManager: Connection to Worker "${client.clientId}" closed`)
141+
142+
this.workerAgents[client.clientId].connected = false
140143
delete this.workerAgents[client.clientId]
141144
})
142-
145+
this.logger.debug(`ExpectationManager: Connection to Worker "${client.clientId}" established`)
143146
this._triggerEvaluateExpectations(true)
144147
break
145148
}
@@ -226,7 +229,7 @@ export class ExpectationManager {
226229
type: 'internal',
227230
hookMethods: clientMethods,
228231
})
229-
this.workerAgents[clientId] = { api }
232+
this.workerAgents[clientId] = { api, connected: true }
230233

231234
return workerAgentMethods
232235
}
@@ -238,6 +241,7 @@ export class ExpectationManager {
238241
if (workerAgent.api.type !== 'internal')
239242
throw new Error(`Cannot remove WorkerAgent "${clientId}", due to the type being "${workerAgent.api.type}"`)
240243

244+
workerAgent.connected = false
241245
delete this.workerAgents[clientId]
242246
}
243247
/** Called when there is an updated set of PackageContainerExpectations. */
@@ -295,7 +299,7 @@ export class ExpectationManager {
295299
}
296300
async getStatus(): Promise<any> {
297301
return {
298-
workforce: await this.workforceAPI.getStatus(),
302+
workforce: this.workforceAPI.connected ? await this.workforceAPI.getStatus() : {},
299303
expectationManager: this.status,
300304
}
301305
}
@@ -824,6 +828,8 @@ export class ExpectationManager {
824828
let hasQueriedAnyone = false
825829
await Promise.all(
826830
Object.entries(this.workerAgents).map(async ([workerId, workerAgent]) => {
831+
if (!workerAgent.connected) return
832+
827833
// Only ask each worker once:
828834
if (
829835
!trackedExp.queriedWorkers[workerId] ||
@@ -1513,7 +1519,7 @@ export class ExpectationManager {
15131519
const trackedPackageContainer = this.trackedPackageContainers[containerId]
15141520
if (trackedPackageContainer.currentWorker) {
15151521
const workerAgent = this.workerAgents[trackedPackageContainer.currentWorker]
1516-
if (workerAgent) {
1522+
if (workerAgent && workerAgent.connected) {
15171523
try {
15181524
const result = await workerAgent.api.disposePackageContainerMonitors(containerId)
15191525
if (result.success) {
@@ -1553,16 +1559,20 @@ export class ExpectationManager {
15531559
// If the packageContainer was newly updated, reset and set up again:
15541560
if (trackedPackageContainer.currentWorker) {
15551561
const workerAgent = this.workerAgents[trackedPackageContainer.currentWorker]
1556-
const disposeMonitorResult = await workerAgent.api.disposePackageContainerMonitors(
1557-
trackedPackageContainer.id
1558-
)
1559-
if (!disposeMonitorResult.success) {
1560-
badStatus = true
1561-
this.updateTrackedPackageContainerStatus(trackedPackageContainer, StatusCode.BAD, {
1562-
user: `Unable to restart monitor, due to ${disposeMonitorResult.reason.user}`,
1563-
tech: `Unable to restart monitor: ${disposeMonitorResult.reason.tech}`,
1564-
})
1565-
continue // Break further execution for this PackageContainer
1562+
if (workerAgent && workerAgent.connected) {
1563+
const disposeMonitorResult = await workerAgent.api.disposePackageContainerMonitors(
1564+
trackedPackageContainer.id
1565+
)
1566+
if (!disposeMonitorResult.success) {
1567+
badStatus = true
1568+
this.updateTrackedPackageContainerStatus(trackedPackageContainer, StatusCode.BAD, {
1569+
user: `Unable to restart monitor, due to ${disposeMonitorResult.reason.user}`,
1570+
tech: `Unable to restart monitor: ${disposeMonitorResult.reason.tech}`,
1571+
})
1572+
continue // Break further execution for this PackageContainer
1573+
}
1574+
} else {
1575+
// Lost connecttion to the worker & monitor
15661576
}
15671577
trackedPackageContainer.currentWorker = null
15681578
}
@@ -1580,7 +1590,9 @@ export class ExpectationManager {
15801590

15811591
let notSupportReason: Reason | null = null
15821592
await Promise.all(
1583-
Object.entries(this.workerAgents).map(async ([workerId, workerAgent]) => {
1593+
Object.entries(this.workerAgents).map<Promise<void>>(async ([workerId, workerAgent]) => {
1594+
if (!workerAgent.connected) return
1595+
15841596
const support = await workerAgent.api.doYouSupportPackageContainer(
15851597
trackedPackageContainer.packageContainer
15861598
)
@@ -1934,6 +1946,7 @@ export type ExpectationManagerServerOptions =
19341946

19351947
interface TrackedWorkerAgent {
19361948
api: WorkerAgentAPI
1949+
connected: boolean
19371950
}
19381951

19391952
interface TrackedExpectation {

shared/packages/worker/src/workerAgent.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,12 +520,22 @@ export class WorkerAgent {
520520
// Don's spin down if a monitor is active
521521
if (!Object.keys(this.activeMonitors).length) {
522522
this.logger.info(`Worker: is idle, requesting spinning down`)
523-
this.appContainerAPI.requestSpinDown()
523+
524+
if (this.appContainerAPI.connected) {
525+
this.appContainerAPI.requestSpinDown()
526+
} else {
527+
// Huh, we're not connected to the appContainer.
528+
// Well, we want to spin down anyway, so we'll do it:
529+
// eslint-disable-next-line no-process-exit
530+
process.exit(54)
531+
}
524532
}
525533
}
526534
}
527535
// Also ping the AppContainer
528-
this.appContainerAPI.ping()
536+
if (this.appContainerAPI.connected) {
537+
this.appContainerAPI.ping()
538+
}
529539
}
530540
private IDidSomeWork() {
531541
this.lastWorkTime = Date.now()

0 commit comments

Comments
 (0)