Skip to content

Commit 2fd7143

Browse files
jstarplnytamin
andcommitted
fix: move the critical worker functionality into appContainer/workerAgent
Co-authored-by: Johan Nyman <johan@nytamin.se>
1 parent eaef78b commit 2fd7143

12 files changed

Lines changed: 324 additions & 240 deletions

File tree

apps/appcontainer-node/packages/generic/src/appContainer.ts

Lines changed: 81 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,15 @@ export class AppContainer {
5555
private busyPorts = new Set<number>()
5656

5757
private apps: Map<
58+
// <- TODO: Needs to understand which apps are special and which aren't
5859
AppId,
5960
{
6061
process: cp.ChildProcess
6162
appType: AppType
63+
/** Set to true if app should be considered for scaling down */
64+
isAutoScaling: boolean
65+
/** Set to true if the app is only handling playout-critical expectations */
66+
isOnlyForCriticalExpectations: boolean
6267
/** Set to true when the process is about to be killed */
6368
toBeKilled: boolean
6469
restarts: number
@@ -71,7 +76,7 @@ export class AppContainer {
7176
start: number
7277
}
7378
> = new Map()
74-
private availableApps: Map<AppType, AvailableAppInfo> = new Map()
79+
private availableApps: Map<AppType, AvailableAppInfo> = new Map() // <- needs to be smarter
7580
private websocketServer?: WebsocketServer
7681

7782
private monitorAppsTimer: NodeJS.Timer | undefined
@@ -191,8 +196,8 @@ export class AppContainer {
191196
})
192197
}
193198
async init(): Promise<void> {
194-
await this.setupAvailableApps()
195-
// Note: if we later change this.setupAvailableApps to run on an interval
199+
await this.discoverAvailableApps()
200+
// Note: if we later change this.discoverAvailableApps to run on an interval
196201
// don't throw here:
197202
if (this.availableApps.size === 0) {
198203
throw new Error(`AppContainer found no apps upon init. (Check if there are any Worker executables?)`)
@@ -239,12 +244,11 @@ export class AppContainer {
239244
},
240245
requestSpinDown: async (): Promise<void> => {
241246
const app = this.apps.get(clientId)
242-
if (app) {
243-
if (this.getAppCount(app.appType) > this.config.appContainer.minRunningApps) {
244-
this.spinDown(clientId, `Requested by app`).catch((error) => {
245-
this.logger.error(`Error when spinning down app "${clientId}": ${stringifyError(error)}`)
246-
})
247-
}
247+
if (!app || !app.isAutoScaling) return
248+
if (this.getScalingAppCount(app.appType) > this.config.appContainer.minRunningApps) {
249+
this.spinDown(clientId, `Requested by app`).catch((error) => {
250+
this.logger.error(`Error when spinning down app "${clientId}": ${stringifyError(error)}`)
251+
})
248252
}
249253
},
250254
workerStorageWriteLock: async (
@@ -265,17 +269,26 @@ export class AppContainer {
265269
}
266270
}
267271

268-
private getAppCount(appType: AppType): number {
272+
private getScalingAppCount(appType: AppType): number {
273+
let count = 0
274+
for (const app of this.apps.values()) {
275+
if (app.appType === appType && app.isAutoScaling) count++
276+
}
277+
return count
278+
}
279+
private getCriticalExpectationAppCount(appType: AppType): number {
269280
let count = 0
270281
for (const app of this.apps.values()) {
271-
if (app.appType === appType) count++
282+
if (app.appType === appType && app.isOnlyForCriticalExpectations) count++
272283
}
273284
return count
274285
}
275-
private async setupAvailableApps() {
276-
const getWorkerArgs = (appId: AppId): string[] => {
286+
287+
private async discoverAvailableApps() {
288+
const getWorkerArgs = (appId: AppId, pickUpCriticalExpectationsOnly: boolean): string[] => {
277289
return [
278290
`--workerId=${appId}`,
291+
pickUpCriticalExpectationsOnly ? `--pickUpCriticalExpectationsOnly=1` : '',
279292
`--workforceURL=${this.config.appContainer.workforceURL}`,
280293
`--appContainerURL=${'ws://127.0.0.1:' + this.websocketServer?.port}`,
281294

@@ -291,7 +304,7 @@ export class AppContainer {
291304
? `--costMultiplier=${this.config.appContainer.worker.costMultiplier}`
292305
: '',
293306
this.config.appContainer.worker.considerCPULoad
294-
? `--costMultiplier=${this.config.appContainer.worker.considerCPULoad}`
307+
? `--considerCPULoad=${this.config.appContainer.worker.considerCPULoad}`
295308
: '',
296309
this.config.appContainer.worker.resourceId
297310
? `--resourceId=${this.config.appContainer.worker.resourceId}`
@@ -309,30 +322,31 @@ export class AppContainer {
309322
const appType = protectString<AppType>('worker')
310323
this.availableApps.set(appType, {
311324
file: process.execPath,
312-
args: (appId: AppId) => {
313-
return [path.resolve('.', '../../worker/app/dist/index.js'), ...getWorkerArgs(appId)]
325+
getExecArgs: (appId: AppId) => {
326+
return [path.resolve('.', '../../worker/app/dist/index.js'), ...getWorkerArgs(appId, false)]
314327
},
328+
canRunInCriticalExpectationsOnlyMode: true,
315329
cost: 0,
316330
})
317331
} else {
318332
// Process is a compiled executable
319333
// Look for the worker executable(s) in the same folder:
320334

321335
const dirPath = path.dirname(process.execPath)
322-
// Note: nexe causes issues with its virtual file system: https://github.com/nexe/nexe/issues/613#issuecomment-579107593
323336

324337
;(await fs.promises.readdir(dirPath)).forEach((fileName) => {
325-
if (fileName.match(/worker/i)) {
326-
// We use the filename to identify the appType:
327-
const appType: AppType = protectString<AppType>(fileName)
328-
this.availableApps.set(appType, {
329-
file: path.join(dirPath, fileName),
330-
args: (appId: AppId) => {
331-
return [...getWorkerArgs(appId)]
332-
},
333-
cost: 0,
334-
})
335-
}
338+
if (!fileName.match(/worker/i)) return
339+
340+
// We use the filename to identify the appType:
341+
const appType: AppType = protectString<AppType>(fileName)
342+
this.availableApps.set(appType, {
343+
file: path.join(dirPath, fileName),
344+
getExecArgs: (appId: AppId) => {
345+
return [...getWorkerArgs(appId, false)]
346+
},
347+
canRunInCriticalExpectationsOnlyMode: true,
348+
cost: 0,
349+
})
336350
})
337351
}
338352

@@ -522,22 +536,34 @@ export class AppContainer {
522536
async spinUp(appType: AppType, longSpinDownTime = false): Promise<AppId> {
523537
return this._spinUp(appType, longSpinDownTime)
524538
}
525-
private async _spinUp(appType: AppType, longSpinDownTime = false): Promise<AppId> {
539+
private async _spinUp(
540+
appType: AppType,
541+
longSpinDownTime = false,
542+
isOnlyForCriticalExpectations = false
543+
): Promise<AppId> {
526544
const availableApp = this.availableApps.get(appType)
527545
if (!availableApp) throw new Error(`Unknown appType "${appType}"`)
528546

529547
const appId = this.getNewAppId()
530548

531549
this.logger.debug(`Spinning up app "${appId}" of type "${appType}"`)
532550

533-
const child = this.setupChildProcess(appType, appId, availableApp)
551+
const child = this.setupChildProcess(appType, appId, availableApp, isOnlyForCriticalExpectations)
552+
553+
let isAutoScaling = true
554+
if (isOnlyForCriticalExpectations) {
555+
isAutoScaling = false
556+
}
557+
534558
this.apps.set(appId, {
535559
process: child,
536560
appType: appType,
537561
toBeKilled: false,
538562
restarts: 0,
539563
lastRestart: 0,
540564
monitorPing: false,
565+
isAutoScaling: isAutoScaling,
566+
isOnlyForCriticalExpectations: isOnlyForCriticalExpectations,
541567
lastPing: Date.now(),
542568
spinDownTime: this.config.appContainer.spinDownTime * (longSpinDownTime ? 10 : 1),
543569
workerAgentApi: null,
@@ -582,7 +608,12 @@ export class AppContainer {
582608
}
583609
})
584610
}
585-
private setupChildProcess(appType: AppType, appId: AppId, availableApp: AvailableAppInfo): cp.ChildProcess {
611+
private setupChildProcess(
612+
appType: AppType,
613+
appId: AppId,
614+
availableApp: AvailableAppInfo,
615+
useCriticalOnlyMode: boolean
616+
): cp.ChildProcess {
586617
const cwd = process.execPath.match(/node.exe$/)
587618
? undefined // Process runs as a node process, we're probably in development mode.
588619
: path.dirname(process.execPath) // Process runs as a node process, we're probably in development mode.
@@ -602,7 +633,7 @@ export class AppContainer {
602633
this.usedInspectPorts.add(inspectPort)
603634
}
604635

605-
const child = cp.spawn(availableApp.file, availableApp.args(appId), {
636+
const child = cp.spawn(availableApp.file, availableApp.getExecArgs(appId, useCriticalOnlyMode), {
606637
cwd: cwd,
607638
env: {
608639
...process.env,
@@ -613,10 +644,10 @@ export class AppContainer {
613644
this.logger.info(`Starting process "${appId}" (${appType}), pid=${child.pid}: "${availableApp.file}"`)
614645

615646
child.stdout.on('data', (message) => {
616-
this.logFromApp(appId, appType, message, this.logger.debug)
647+
this.onOutputFromApp(appId, appType, message, this.logger.debug)
617648
})
618649
child.stderr.on('data', (message) => {
619-
this.logFromApp(appId, appType, message, this.logger.error)
650+
this.onOutputFromApp(appId, appType, message, this.logger.error)
620651
// this.logger.debug(`${appId} stderr: ${message}`)
621652
})
622653
child.on('error', (err) => {
@@ -648,7 +679,7 @@ export class AppContainer {
648679

649680
app.process.removeAllListeners()
650681

651-
const newChild = this.setupChildProcess(appType, appId, availableApp)
682+
const newChild = this.setupChildProcess(appType, appId, availableApp, useCriticalOnlyMode)
652683

653684
app.process = newChild
654685
}, timeUntilRestart)
@@ -683,18 +714,29 @@ export class AppContainer {
683714
})
684715
}
685716
}
717+
686718
this.spinUpMinimumApps().catch((error) => {
687719
this.logger.error(`Error in spinUpMinimumApps: ${stringifyError(error)}`)
688720
})
689721
}
722+
690723
private async spinUpMinimumApps(): Promise<void> {
724+
if (this.config.appContainer.minCriticalWorkerApps !== null) {
725+
for (const [appType, appInfo] of this.availableApps.entries()) {
726+
if (!appInfo.canRunInCriticalExpectationsOnlyMode) continue
727+
while (this.getCriticalExpectationAppCount(appType) < this.config.appContainer.minCriticalWorkerApps) {
728+
await this._spinUp(appType, false, true)
729+
}
730+
}
731+
}
732+
691733
for (const appType of this.availableApps.keys()) {
692-
while (this.getAppCount(appType) < this.config.appContainer.minRunningApps) {
734+
while (this.getScalingAppCount(appType) < this.config.appContainer.minRunningApps) {
693735
await this._spinUp(appType)
694736
}
695737
}
696738
}
697-
private logFromApp(appId: AppId, appType: AppType, data: any, defaultLog: LeveledLogMethod): void {
739+
private onOutputFromApp(appId: AppId, appType: AppType, data: any, defaultLog: LeveledLogMethod): void {
698740
const messages = `${data}`.split('\n')
699741

700742
for (const message of messages) {
@@ -774,8 +816,9 @@ export class AppContainer {
774816
}
775817
interface AvailableAppInfo {
776818
file: string
777-
args: (appId: AppId) => string[]
819+
getExecArgs: (appId: AppId, useCriticalOnlyMode: boolean) => string[]
778820
/** Some kind of value, how much it costs to run it, per minute */
821+
canRunInCriticalExpectationsOnlyMode: boolean
779822
cost: number
780823
}
781824

apps/package-manager/packages/generic/src/connector.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ export class Connector {
8282
config.packageManager.accessUrl || undefined,
8383
workForceConnectionOptions,
8484
config.packageManager.concurrency,
85-
config.packageManager.chaosMonkey,
86-
config.packageManager.criticalWorkerPoolSize
85+
config.packageManager.chaosMonkey
8786
)
8887
}
8988

0 commit comments

Comments
 (0)