Skip to content

Commit cfe389c

Browse files
committed
fix: FileShare: fast-path to avoid a timeout issue when many read/write-calls are queued at the same time
1 parent 6925b75 commit cfe389c

8 files changed

Lines changed: 321 additions & 88 deletions

File tree

apps/appcontainer-node/packages/generic/src/appContainer.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ export class AppContainer {
105105
client.once('close', () => {
106106
this.logger.warn(`Connection to Worker "${client.clientId}" closed`)
107107
app.workerAgentApi = null
108+
109+
this.workerStorage.releaseLockForTag(client.clientId)
108110
})
109111
this.logger.info(`Connection to Worker "${client.clientId}" established`)
110112
app.workerAgentApi = api
@@ -236,7 +238,7 @@ export class AppContainer {
236238
dataId: string,
237239
customTimeout?: number
238240
): Promise<{ lockId: string; current: any | undefined }> => {
239-
return this.workerStorage.getWriteLock(dataId, customTimeout)
241+
return this.workerStorage.getWriteLock(dataId, customTimeout, clientId)
240242
},
241243
workerStorageReleaseLock: async (dataId: string, lockId: string): Promise<void> => {
242244
return this.workerStorage.releaseLock(dataId, lockId)

shared/packages/api/src/__tests__/lib.spec.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { diff, promiseTimeout, stringifyError, waitTime } from '../lib'
1+
import { deferGets, diff, promiseTimeout, stringifyError, waitTime } from '../lib'
22

33
describe('lib', () => {
44
test('diff', () => {
@@ -136,5 +136,46 @@ describe('lib', () => {
136136

137137
expect(stringifyError(errWithContext, true)).toEqual('Error: errr, Context: a context')
138138
})
139+
test('deferGets', async () => {
140+
let i = 0
141+
const deferred = deferGets(async (val: string) => {
142+
await waitTime(10)
143+
return `${val}_${i++}`
144+
})
145+
146+
const values = await Promise.all([
147+
deferred('a', 'a1'), // will be executed
148+
deferred('b', 'b2'), // will be executed
149+
deferred('a', 'a3'), // will not be executed
150+
deferred('a', 'a4'), // will not be executed
151+
deferred('c', 'c5'), // will be executed
152+
deferred('a', 'a6'), // will not be executed
153+
deferred('b', 'b7'), // will not be executed
154+
])
155+
156+
const values2 = await Promise.all([
157+
deferred('a', 'a8'), // will be executed
158+
deferred('a', 'a9'), // will not be executed
159+
deferred('b', 'b10'), // will be executed
160+
deferred('a', 'a11'), // will not be executed
161+
])
162+
163+
expect(values).toEqual([
164+
'a1_0', // was executed
165+
'b2_1', // was executed
166+
'a1_0', // was not executed
167+
'a1_0', // was not executed
168+
'c5_2', // was executed
169+
'a1_0', // was not executed
170+
'b2_1', // was not executed
171+
])
172+
expect(values2).toEqual([
173+
'a8_3', // was executed
174+
'a8_3', // was not executed
175+
'b10_4', // was executed
176+
'a8_3', // was not executed
177+
])
178+
expect(i).toBe(5)
179+
})
139180
})
140181
export {}

shared/packages/api/src/dataStorage.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export class DataStore {
1616
ttl: number
1717
timeout?: NodeJS.Timeout
1818
} | null
19+
tag: string | undefined
1920
/** The data */
2021
data: any
2122
}
@@ -54,14 +55,19 @@ export class DataStore {
5455
}
5556

5657
/** Request to aquire a write lock */
57-
async getWriteLock(dataId: string, customTimeout?: number): Promise<{ lockId: string; current: any | undefined }> {
58+
async getWriteLock(
59+
dataId: string,
60+
customTimeout?: number,
61+
tag?: string
62+
): Promise<{ lockId: string; current: any | undefined }> {
5863
// Wait for getting access to the data:
5964
await this._waitForAccess(dataId)
6065
// Set a write lock:
6166
if (!this.storage.has(dataId)) {
6267
this.storage.set(dataId, {
6368
accessLock: null,
6469
data: undefined,
70+
tag: tag,
6571
})
6672
}
6773
const data = this.storage.get(dataId)
@@ -93,6 +99,14 @@ export class DataStore {
9399
this._triggerHandleClaims(true)
94100
}
95101
}
102+
/** Release all locks for a certain tag */
103+
releaseLockForTag(tag: string): void {
104+
this.storage.forEach((value, dataId) => {
105+
if (value.accessLock && value.tag === tag) {
106+
this.releaseLock(dataId, value.accessLock.lockId)
107+
}
108+
})
109+
}
96110
write(dataId: string, lockId: string, writeData: string): void {
97111
const data = this.storage.get(dataId)
98112
if (!data) throw new Error(`DataStorage: Error when trying to write data: "${dataId}" not found`)
@@ -184,7 +198,7 @@ export class DataStore {
184198
this.logger.warn(
185199
`AccessLock timed out after ${Date.now() - data.accessLock.created} ms "${
186200
data.accessLock.lockId
187-
}"`
201+
}", claim count: ${claims.length}`
188202
)
189203
isWaitingForLock = false
190204
}

shared/packages/api/src/lib.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export function waitTime(duration: number): Promise<void> {
4949
setTimeout(resolve, duration)
5050
})
5151
}
52+
/** Intercepts a promise and rejects if the promise doesn't resolve in time. */
5253
export function promiseTimeout<T>(
5354
p: Promise<T>,
5455
timeoutTime: number,
@@ -214,3 +215,47 @@ export function isNodeRunningInDebugMode(): boolean {
214215
typeof v8debug === 'object' || /--debug|--inspect/.test(process.execArgv.join(' ') + process.env.NODE_OPTIONS)
215216
)
216217
}
218+
219+
/**
220+
* Wraps a function, so that multiple calls to it will be grouped together,
221+
* if the calls are close enough in time so that the resulting promise havent resolved yet.
222+
* The subsequent calls will resolve with the same result as the first call.
223+
*/
224+
export function deferGets<Args extends any[], Result>(
225+
fcn: (...args: Args) => Promise<Result>
226+
): (groupId: string, ...args: Args) => Promise<Result> {
227+
const defers = new Map<
228+
string,
229+
{
230+
resolve: (value: Result) => void
231+
reject: (err: any) => void
232+
}[]
233+
>()
234+
235+
return (groupId: string, ...args: Args) => {
236+
return new Promise<Result>((resolve, reject) => {
237+
// Check if there already is a call waiting:
238+
const waiting = defers.get(groupId)
239+
if (waiting) {
240+
waiting.push({ resolve, reject })
241+
} else {
242+
const newWaiting = [{ resolve, reject }]
243+
defers.set(groupId, newWaiting)
244+
245+
fcn(...args)
246+
.then((result) => {
247+
defers.delete(groupId)
248+
for (const w of newWaiting) {
249+
w.resolve(result)
250+
}
251+
})
252+
.catch((err) => {
253+
defers.delete(groupId)
254+
for (const w of newWaiting) {
255+
w.reject(err)
256+
}
257+
})
258+
}
259+
})
260+
}
261+
}

shared/packages/worker/src/worker/accessorHandlers/fileShare.ts

Lines changed: 111 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -388,102 +388,134 @@ export class FileShareAccessorHandle<Metadata> extends GenericFileAccessorHandle
388388
// On windows, we can assign the share to a drive letter, as that increases performance quite a lot:
389389
const windowsWorker = this.worker as WindowsWorker
390390

391+
const STORE_DRIVELETTERS = `fileShare_driveLetters_${this.worker.agentAPI.location.localComputerId}`
391392
// Note: Use the mappedDriveLetters as a WorkerStorage, to avoid a potential issue where other workers
392393
// mess with the drive letter at the same time that we do, and we all end up to be unsynced with reality.
393-
await this.worker.agentAPI.workerStorageWrite<MappedDriveLetters>(
394-
`fileShare_driveLetters_${this.worker.agentAPI.location.localComputerId}`,
395-
PREPARE_FILE_ACCESS_TIMEOUT,
396-
async (mappedDriveLetters0): Promise<MappedDriveLetters> => {
397-
const mappedDriveLetters: MappedDriveLetters = mappedDriveLetters0 ?? {}
398-
// First we check if the drive letter has already been assigned in our cache:
399-
let foundMappedDriveLetter: string | null = null
400-
for (const [driveLetter, mountedPath] of Object.entries(mappedDriveLetters)) {
401-
if (mountedPath === folderPath) {
402-
foundMappedDriveLetter = driveLetter
403-
}
404-
}
405394

406-
if (foundMappedDriveLetter && forceRemount) {
407-
// Force a re-mount of the drive letter:
408-
delete mappedDriveLetters[foundMappedDriveLetter]
409-
await networkDrive.unmount(foundMappedDriveLetter)
410-
foundMappedDriveLetter = null
411-
}
395+
if (!forceRemount) {
396+
// Fast-path, just read the drive letter from the store:
397+
// Note: This is a fast path in the case of many jobs fired at several Workers simultaneously,
398+
// they can all read in parallel from the same store. When doing a .workerStorageWrite(), that is a single-threaded process.
412399

413-
if (foundMappedDriveLetter) {
414-
// It seems a drive letter is already mapped up.
415-
this.actualFolderPath = `${foundMappedDriveLetter}:\\`
416-
handlingDone = true
400+
const mappedDriveLetters: MappedDriveLetters =
401+
(await this.worker.agentAPI.workerStorageRead<MappedDriveLetters>(STORE_DRIVELETTERS)) ?? {}
402+
403+
// Check if the drive letter has already been assigned in our cache:
404+
let foundMappedDriveLetter: string | null = null
405+
for (const [driveLetter, mountedPath] of Object.entries(mappedDriveLetters)) {
406+
if (mountedPath === folderPath) {
407+
foundMappedDriveLetter = driveLetter
417408
}
418-
if (!handlingDone) {
419-
// Update our cache of mounted drive letters:
420-
for (const [driveLetter, mountedPath] of Object.entries(await this.getMountedDriveLetters())) {
421-
mappedDriveLetters[driveLetter] = mountedPath
422-
// If the mounted path is the one we want, we don't have to mount a new one:
409+
}
410+
if (foundMappedDriveLetter) {
411+
// It seems a drive letter is already mapped up.
412+
this.actualFolderPath = `${foundMappedDriveLetter}:\\`
413+
handlingDone = true
414+
}
415+
}
416+
417+
if (!handlingDone) {
418+
await this.worker.agentAPI.workerStorageWrite<MappedDriveLetters>(
419+
STORE_DRIVELETTERS,
420+
PREPARE_FILE_ACCESS_TIMEOUT,
421+
async (mappedDriveLetters0): Promise<MappedDriveLetters> => {
422+
const mappedDriveLetters: MappedDriveLetters = mappedDriveLetters0 ?? {}
423+
// First we check if the drive letter has already been assigned in our cache:
424+
let foundMappedDriveLetter: string | null = null
425+
for (const [driveLetter, mountedPath] of Object.entries(mappedDriveLetters)) {
423426
if (mountedPath === folderPath) {
424427
foundMappedDriveLetter = driveLetter
425428
}
426429
}
430+
431+
if (foundMappedDriveLetter && forceRemount) {
432+
// Force a re-mount of the drive letter:
433+
delete mappedDriveLetters[foundMappedDriveLetter]
434+
await networkDrive.unmount(foundMappedDriveLetter)
435+
foundMappedDriveLetter = null
436+
}
437+
427438
if (foundMappedDriveLetter) {
439+
// It seems a drive letter is already mapped up.
428440
this.actualFolderPath = `${foundMappedDriveLetter}:\\`
429441
handlingDone = true
430442
}
431-
}
432-
433-
if (!handlingDone) {
434-
// Find next free drive letter:
435-
const freeDriveLetter = windowsWorker.agentAPI.config.windowsDriveLetters?.find(
436-
(driveLetter) => !mappedDriveLetters[driveLetter]
437-
)
438-
439-
if (freeDriveLetter) {
440-
// Try to map the remote share onto a drive:
441-
442-
try {
443-
await networkDrive.mount(
444-
folderPath,
445-
freeDriveLetter,
446-
this.accessor.userName,
447-
this.accessor.password
448-
)
449-
} catch (e) {
450-
const errStr = `${e}`
451-
if (
452-
errStr.match(/invalid response/i) ||
453-
errStr.match(/Ugyldig svar/i) // "Invalid response" in Norvegian
454-
) {
455-
// Temporary handling of the error
456-
457-
const mappedDrives = await this.getMountedDriveLetters()
458-
459-
if (mappedDrives[freeDriveLetter] === folderPath) {
460-
this.worker.logger.warn(`Supressed error: ${errStr}`)
461-
462-
this.worker.logger.warn(`Mapped drives: ${Object.keys(mappedDrives).join(',')}`)
463-
this.worker.logger.warn(
464-
`${freeDriveLetter} is currently mapped to ${mappedDrives[freeDriveLetter]}`
465-
)
466-
} else {
467-
this.worker.logger.warn(`Mapped drives: ${Object.keys(mappedDrives).join(',')}`)
468-
this.worker.logger.warn(
469-
`${freeDriveLetter} is currently mapped to ${mappedDrives[freeDriveLetter]}`
470-
)
471-
throw e
472-
}
473-
} else throw e
443+
if (!handlingDone) {
444+
// Update our cache of mounted drive letters:
445+
for (const [driveLetter, mountedPath] of Object.entries(
446+
await this.getMountedDriveLetters()
447+
)) {
448+
mappedDriveLetters[driveLetter] = mountedPath
449+
// If the mounted path is the one we want, we don't have to mount a new one:
450+
if (mountedPath === folderPath) {
451+
foundMappedDriveLetter = driveLetter
452+
}
474453
}
454+
if (foundMappedDriveLetter) {
455+
this.actualFolderPath = `${foundMappedDriveLetter}:\\`
456+
handlingDone = true
457+
}
458+
}
475459

476-
mappedDriveLetters[freeDriveLetter] = folderPath
477-
this.actualFolderPath = `${freeDriveLetter}:\\`
478-
handlingDone = true
479-
} else {
480-
// Not able to find any free drive letters.
481-
// Revert to direct access then
460+
if (!handlingDone) {
461+
// Find next free drive letter:
462+
const freeDriveLetter = windowsWorker.agentAPI.config.windowsDriveLetters?.find(
463+
(driveLetter) => !mappedDriveLetters[driveLetter]
464+
)
465+
466+
if (freeDriveLetter) {
467+
// Try to map the remote share onto a drive:
468+
469+
try {
470+
await networkDrive.mount(
471+
folderPath,
472+
freeDriveLetter,
473+
this.accessor.userName,
474+
this.accessor.password
475+
)
476+
} catch (e) {
477+
const errStr = `${e}`
478+
if (
479+
errStr.match(/invalid response/i) ||
480+
errStr.match(/Ugyldig svar/i) // "Invalid response" in Norvegian
481+
) {
482+
// Temporary handling of the error
483+
484+
const mappedDrives = await this.getMountedDriveLetters()
485+
486+
if (mappedDrives[freeDriveLetter] === folderPath) {
487+
this.worker.logger.warn(`Supressed error: ${errStr}`)
488+
489+
this.worker.logger.warn(
490+
`Mapped drives: ${Object.keys(mappedDrives).join(',')}`
491+
)
492+
this.worker.logger.warn(
493+
`${freeDriveLetter} is currently mapped to ${mappedDrives[freeDriveLetter]}`
494+
)
495+
} else {
496+
this.worker.logger.warn(
497+
`Mapped drives: ${Object.keys(mappedDrives).join(',')}`
498+
)
499+
this.worker.logger.warn(
500+
`${freeDriveLetter} is currently mapped to ${mappedDrives[freeDriveLetter]}`
501+
)
502+
throw e
503+
}
504+
} else throw e
505+
}
506+
507+
mappedDriveLetters[freeDriveLetter] = folderPath
508+
this.actualFolderPath = `${freeDriveLetter}:\\`
509+
handlingDone = true
510+
} else {
511+
// Not able to find any free drive letters.
512+
// Revert to direct access then
513+
}
482514
}
515+
return mappedDriveLetters
483516
}
484-
return mappedDriveLetters
485-
}
486-
)
517+
)
518+
}
487519
}
488520

489521
if (!handlingDone) {

0 commit comments

Comments
 (0)