Skip to content

Commit 82bb9fc

Browse files
committed
fix: better handling of timed out jobs
1 parent d9deac6 commit 82bb9fc

4 files changed

Lines changed: 107 additions & 49 deletions

File tree

shared/packages/api/src/methods.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ export namespace ExpectationManagerWorkerAgent {
7272
exp: Expectation.Any,
7373
wasFullfilled: boolean
7474
) => Promise<ReturnTypeIsExpectationFullfilled>
75-
workOnExpectation: (exp: Expectation.Any, cost: ExpectationCost) => Promise<WorkInProgressInfo>
75+
workOnExpectation: (exp: Expectation.Any, cost: ExpectationCost, timeout: number) => Promise<WorkInProgressInfo>
7676
removeExpectation: (exp: Expectation.Any) => Promise<ReturnTypeRemoveExpectation>
7777

7878
cancelWorkInProgress: (wipId: number) => Promise<void>

shared/packages/expectationManager/src/expectationManager.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -928,7 +928,8 @@ export class ExpectationManager {
928928
// Start working on the Expectation:
929929
const wipInfo = await assignedWorker.worker.workOnExpectation(
930930
trackedExp.exp,
931-
assignedWorker.cost
931+
assignedWorker.cost,
932+
this.constants.WORK_TIMEOUT_TIME
932933
)
933934

934935
trackedExp.status.workInProgressCancel = async () => {

shared/packages/expectationManager/src/workerAgentApi.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@ export class WorkerAgentAPI
5252
}
5353
async workOnExpectation(
5454
exp: Expectation.Any,
55-
cost: ExpectationManagerWorkerAgent.ExpectationCost
55+
cost: ExpectationManagerWorkerAgent.ExpectationCost,
56+
timeout: number
5657
): Promise<ExpectationManagerWorkerAgent.WorkInProgressInfo> {
5758
// Note: This call is ultimately received in shared/packages/worker/src/workerAgent.ts
58-
return this._sendMessage('workOnExpectation', exp, cost)
59+
return this._sendMessage('workOnExpectation', exp, cost, timeout)
5960
}
6061
async removeExpectation(exp: Expectation.Any): Promise<ReturnTypeRemoveExpectation> {
6162
// Note: This call is ultimately received in shared/packages/worker/src/workerAgent.ts

shared/packages/worker/src/workerAgent.ts

Lines changed: 101 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,14 @@ import { WorkforceAPI } from './workforceApi'
3333
/** The WorkerAgent is a front for a Worker (@see GenericWorker).
3434
* It is intended to be the main class in its worker-process, and handles things like communication with the WorkForce or the Expectation-Manager
3535
*/
36+
3637
export class WorkerAgent {
3738
private _worker: GenericWorker
3839
// private _busyMethodCount = 0
39-
private currentJobs: { cost: ExpectationManagerWorkerAgent.ExpectationCost; progress: number }[] = []
4040
private workforceAPI: WorkforceAPI
4141
private appContainerAPI: AppContainerAPI
4242
private wipI = 0
43-
44-
private worksInProgress: { [wipId: string]: IWorkInProgress } = {}
43+
private currentJobs: CurrentJob[] = []
4544
public readonly id: string
4645
private workForceConnectionOptions: ClientConnectionOptions
4746
private appContainerConnectionOptions: ClientConnectionOptions | null
@@ -131,11 +130,13 @@ export class WorkerAgent {
131130
for (const expectationManager of Object.values(this.expectationManagers)) {
132131
expectationManager.api.terminate()
133132
}
134-
for (const wipId of Object.keys(this.worksInProgress)) {
135-
this.cancelWorkInProgress(wipId).catch((error) => {
136-
this.logger.error('WorkerAgent.terminate: Error in cancelWorkInProgress')
137-
this.logger.error(error)
138-
})
133+
for (const currentJob of this.currentJobs) {
134+
if (currentJob.wipId) {
135+
this.cancelJob(currentJob.wipId).catch((error) => {
136+
this.logger.error('WorkerAgent.terminate: Error in cancelJob')
137+
this.logger.error(error)
138+
})
139+
}
139140
}
140141
if (this.intervalCheckTimer) clearInterval(this.intervalCheckTimer)
141142
this._worker.terminate()
@@ -239,44 +240,77 @@ export class WorkerAgent {
239240
},
240241
workOnExpectation: async (
241242
exp: Expectation.Any,
242-
cost: ExpectationManagerWorkerAgent.ExpectationCost
243+
cost: ExpectationManagerWorkerAgent.ExpectationCost,
244+
/** Timeout, cancels the job if no updates are received in this time [ms] */
245+
timeout: number
243246
): Promise<ExpectationManagerWorkerAgent.WorkInProgressInfo> => {
244247
this.IDidSomeWork()
245-
const currentjob = {
248+
const currentJob: CurrentJob = {
246249
cost: cost,
250+
cancelled: false,
251+
lastUpdated: Date.now(),
247252
progress: 0,
248-
// callbacksOnDone: [],
253+
wipId: this.wipI++,
254+
workInProgress: null,
255+
timeoutInterval: setInterval(() => {
256+
if (currentJob.cancelled && currentJob.timeoutInterval) {
257+
clearInterval(currentJob.timeoutInterval)
258+
currentJob.timeoutInterval = null
259+
return
260+
}
261+
262+
if (Date.now() - currentJob.lastUpdated > timeout) {
263+
// The job seems to have timed out.
264+
// Expectation Manager will clean up on it's side, we have to do the same here.
265+
266+
this.logger.info(`WorkerAgent: Cancelling job ${currentJob.wipId} due to timeout`)
267+
268+
this.cancelJob(currentJob.wipId).catch((error) => {
269+
// Not much we can do about that error..
270+
this.logger.error('WorkerAgent timeout watch: Error in cancelJob')
271+
this.logger.error(error)
272+
273+
// Ensure that the jop is removed, so that it wont block others:
274+
this.removeJob(currentJob)
275+
})
276+
}
277+
}, 1000),
249278
}
250-
const wipId = this.wipI++
279+
this.currentJobs.push(currentJob)
251280
this.logger.debug(
252-
`Worker "${this.id}" starting job ${wipId}, (${exp.id}). (${this.currentJobs.length})`
281+
`Worker "${this.id}" starting job ${currentJob.wipId}, (${exp.id}). (${this.currentJobs.length})`
253282
)
254-
this.currentJobs.push(currentjob)
255283

256284
try {
257285
const workInProgress = await this._worker.workOnExpectation(exp)
258286

259-
this.worksInProgress[`${wipId}`] = workInProgress
287+
currentJob.workInProgress = workInProgress
260288

261289
workInProgress.on('progress', (actualVersionHash, progress: number) => {
262290
this.IDidSomeWork()
263-
currentjob.progress = progress
264-
expectedManager.api.wipEventProgress(wipId, actualVersionHash, progress).catch((err) => {
265-
if (!this.terminated) {
266-
this.logger.error('Error in wipEventProgress')
267-
this.logger.error(err)
268-
}
269-
})
291+
if (currentJob.cancelled) return // Don't send updates on cancelled work
292+
currentJob.lastUpdated = Date.now()
293+
currentJob.progress = progress
294+
expectedManager.api
295+
.wipEventProgress(currentJob.wipId, actualVersionHash, progress)
296+
.catch((err) => {
297+
if (!this.terminated) {
298+
this.logger.error('Error in wipEventProgress')
299+
this.logger.error(err)
300+
}
301+
})
270302
})
271303
workInProgress.on('error', (error: string) => {
272304
this.IDidSomeWork()
273-
this.currentJobs = this.currentJobs.filter((job) => job !== currentjob)
305+
if (currentJob.cancelled) return // Don't send updates on cancelled work
306+
currentJob.lastUpdated = Date.now()
307+
this.currentJobs = this.currentJobs.filter((job) => job !== currentJob)
274308
this.logger.debug(
275-
`Worker "${this.id}" stopped job ${wipId}, (${exp.id}), due to error. (${this.currentJobs.length})`
309+
`Worker "${this.id}" stopped job ${currentJob.wipId}, (${exp.id}), due to error. (${this.currentJobs.length})`
276310
)
277311

278312
expectedManager.api
279-
.wipEventError(wipId, {
313+
.wipEventError(currentJob.wipId, {
280314
user: 'Work aborted due to an error',
281315
tech: error,
282316
})
@@ -286,34 +320,39 @@ export class WorkerAgent {
286320
this.logger.error(err)
287321
}
288322
})
289-
delete this.worksInProgress[`${wipId}`]
323+
324+
this.removeJob(currentJob)
290325
})
291326
workInProgress.on('done', (actualVersionHash, reason, result) => {
292327
this.IDidSomeWork()
293-
this.currentJobs = this.currentJobs.filter((job) => job !== currentjob)
328+
if (currentJob.cancelled) return // Don't send updates on cancelled work
329+
currentJob.lastUpdated = Date.now()
330+
this.currentJobs = this.currentJobs.filter((job) => job !== currentJob)
294331
this.logger.debug(
295-
`Worker "${this.id}" stopped job ${wipId}, (${exp.id}), done. (${this.currentJobs.length})`
332+
`Worker "${this.id}" stopped job ${currentJob.wipId}, (${exp.id}), done. (${this.currentJobs.length})`
296333
)
297334

298-
expectedManager.api.wipEventDone(wipId, actualVersionHash, reason, result).catch((err) => {
299-
if (!this.terminated) {
300-
this.logger.error('Error in wipEventDone')
301-
this.logger.error(err)
302-
}
303-
})
304-
delete this.worksInProgress[`${wipId}`]
335+
expectedManager.api
336+
.wipEventDone(currentJob.wipId, actualVersionHash, reason, result)
337+
.catch((err) => {
338+
if (!this.terminated) {
339+
this.logger.error('Error in wipEventDone')
340+
this.logger.error(err)
341+
}
342+
})
343+
this.removeJob(currentJob)
305344
})
306345

307346
return {
308-
wipId: wipId,
347+
wipId: currentJob.wipId,
309348
properties: workInProgress.properties,
310349
}
311350
} catch (err) {
312-
// The workOnExpectation failed.
351+
// worker.workOnExpectation() failed.
313352

314-
this.currentJobs = this.currentJobs.filter((job) => job !== currentjob)
353+
this.removeJob(currentJob)
315354
this.logger.debug(
316-
`Worker "${this.id}" stopped job ${wipId}, (${exp.id}), due to initial error. (${this.currentJobs.length})`
355+
`Worker "${this.id}" stopped job ${currentJob.wipId}, (${exp.id}), due to initial error. (${this.currentJobs.length})`
317356
)
318357

319358
throw err
@@ -325,7 +364,7 @@ export class WorkerAgent {
325364
},
326365
cancelWorkInProgress: async (wipId: number): Promise<void> => {
327366
this.IDidSomeWork()
328-
return this.cancelWorkInProgress(wipId)
367+
return this.cancelJob(wipId)
329368
},
330369
doYouSupportPackageContainer: (
331370
packageContainer: PackageContainerExpectation
@@ -449,12 +488,20 @@ export class WorkerAgent {
449488
}
450489
}
451490
}
452-
private async cancelWorkInProgress(wipId: string | number): Promise<void> {
453-
const wip = this.worksInProgress[`${wipId}`]
454-
if (wip) {
455-
await wip.cancel()
491+
private async cancelJob(wipId: number): Promise<void> {
492+
const currentJob = this.currentJobs.find((job) => job.wipId === wipId)
493+
494+
if (currentJob) {
495+
if (currentJob.workInProgress) {
496+
await currentJob.workInProgress.cancel()
497+
currentJob.workInProgress = null
498+
}
499+
this.removeJob(currentJob)
456500
}
457-
delete this.worksInProgress[`${wipId}`]
501+
}
502+
private removeJob(currentJob: CurrentJob): void {
503+
currentJob.cancelled = true
504+
this.currentJobs = this.currentJobs.filter((job) => job !== currentJob)
458505
}
459506
private setupIntervalCheck() {
460507
if (!this.intervalCheckTimer) {
@@ -483,3 +530,12 @@ export class WorkerAgent {
483530
this.lastWorkTime = Date.now()
484531
}
485532
}
533+
interface CurrentJob {
534+
cost: ExpectationManagerWorkerAgent.ExpectationCost
535+
cancelled: boolean
536+
lastUpdated: number
537+
progress: number
538+
timeoutInterval: NodeJS.Timeout | null
539+
wipId: number
540+
workInProgress: IWorkInProgress | null
541+
}

0 commit comments

Comments
 (0)