Skip to content

Commit d0d6b60

Browse files
committed
fix: batch writes to json file, to avoid timeouts when scheduling many writes at the same time.
1 parent 87b90d7 commit d0d6b60

3 files changed

Lines changed: 437 additions & 75 deletions

File tree

shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts

Lines changed: 17 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import path from 'path'
22
import { promisify } from 'util'
33
import fs from 'fs'
4-
import * as LockFile from 'proper-lockfile'
5-
import _ from 'underscore'
64
import {
75
ExpectedPackage,
86
StatusCode,
@@ -23,6 +21,7 @@ import { GenericAccessorHandle } from '../genericHandle'
2321
import { MonitorInProgress } from '../../lib/monitorInProgress'
2422
import { removeBasePath } from './pathJoin'
2523
import { FileEvent, FileWatcher, IFileWatcher } from './FileWatcher'
24+
import { updateJSONFileBatch } from './json-write-file'
2625

2726
export const LocalFolderAccessorHandleType = 'localFolder'
2827
export const FileShareAccessorHandleType = 'fileShare'
@@ -32,7 +31,6 @@ const fsReadFile = promisify(fs.readFile)
3231
const fsReaddir = promisify(fs.readdir)
3332
const fsRmDir = promisify(fs.rmdir)
3433
const fsStat = promisify(fs.stat)
35-
const fsWriteFile = promisify(fs.writeFile)
3634
const fsUnlink = promisify(fs.unlink)
3735
const fsLstat = promisify(fs.lstat)
3836

@@ -423,73 +421,23 @@ export abstract class GenericFileAccessorHandle<Metadata> extends GenericAccesso
423421
const LOCK_ATTEMPTS_COUNT = 10
424422
const RETRY_TIMEOUT = 100 // ms
425423

426-
let lockCompromisedError: Error | null = null
427-
428-
// Retry up to 10 times at locking and writing the file:
429-
for (let i = 0; i < LOCK_ATTEMPTS_COUNT; i++) {
430-
lockCompromisedError = null
431-
432-
// Get file lock
433-
let releaseLock: (() => Promise<void>) | undefined = undefined
434-
try {
435-
releaseLock = await LockFile.lock(this.deferRemovePackagesPath, {
436-
onCompromised: (err) => {
437-
// This is called if the lock somehow gets compromised
438-
this.worker.logger.warn(`updatePackagesToRemove: Lock compromised: ${err}`)
439-
lockCompromisedError = err
440-
},
441-
})
442-
} catch (e) {
443-
if (e instanceof Error && (e as any).code === 'ENOENT') {
444-
// The file does not exist. Create an empty file and try again:
445-
await fsWriteFile(this.deferRemovePackagesPath, '')
446-
continue
447-
} else if (e instanceof Error && (e as any).code === 'ELOCKED') {
448-
// Already locked, try again later:
449-
await sleep(RETRY_TIMEOUT)
450-
continue
451-
} else {
452-
// Unknown error. Log and exit:
453-
this.worker.logger.error(e)
454-
return
455-
}
456-
}
457-
// At this point, we have acquired the lock.
458-
try {
459-
// Read and write to the file:
460-
const oldList = await this.getPackagesToRemove()
461-
const newList = cbManipulateList(clone(oldList))
462-
if (!_.isEqual(oldList, newList)) {
463-
if (lockCompromisedError) {
464-
// The lock was compromised. Try again:
465-
continue
466-
}
467-
await fsWriteFile(this.deferRemovePackagesPath, JSON.stringify(newList))
468-
}
469-
470-
// Release the lock:
471-
if (!lockCompromisedError) await releaseLock()
472-
// Done, exit the function:
473-
return
474-
} catch (e) {
475-
if (e instanceof Error && (e as any).code === 'ERELEASED') {
476-
// Lock was already released. Something must have gone wrong (eg. someone deleted a folder),
477-
// Log and try again:
478-
this.worker.logger.warn(`updatePackagesToRemove: Lock was already released`)
479-
continue
480-
} else {
481-
// Release the lock:
482-
if (!lockCompromisedError) await releaseLock()
483-
throw e
424+
try {
425+
await updateJSONFileBatch<DelayPackageRemovalEntry[]>(
426+
this.deferRemovePackagesPath,
427+
(list) => {
428+
return cbManipulateList(list ?? [])
429+
},
430+
{
431+
retryCount: LOCK_ATTEMPTS_COUNT,
432+
retryTimeout: RETRY_TIMEOUT,
433+
logError: (error) => this.worker.logger.error(error),
434+
logWarning: (message) => this.worker.logger.warn(message),
484435
}
485-
}
486-
}
487-
// At this point, the lock failed
488-
this.worker.logger.error(
489-
`updatePackagesToRemove: Failed to lock file "${this.deferRemovePackagesPath}" after ${LOCK_ATTEMPTS_COUNT} attempts`
490-
)
491-
if (lockCompromisedError) {
492-
this.worker.logger.error(`updatePackagesToRemove: lockCompromisedError: ${lockCompromisedError}`)
436+
)
437+
} catch (e) {
438+
// Not much we can do about it..
439+
// Log and continue:
440+
this.worker.logger.error(e)
493441
}
494442
}
495443
}
@@ -507,9 +455,3 @@ enum StatusCategory {
507455
WATCHER = 'watcher',
508456
FILE = 'file_',
509457
}
510-
function clone<T>(o: T): T {
511-
return JSON.parse(JSON.stringify(o))
512-
}
513-
async function sleep(duration: number): Promise<void> {
514-
return new Promise((r) => setTimeout(r, duration))
515-
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import { getTmpPath, updateJSONFile, updateJSONFileBatch } from '../json-write-file'
2+
import { promises as fs } from 'fs'
3+
4+
const FILE_A = 'file_a.json'
5+
async function cleanup() {
6+
await Promise.all([unlinkIfExists(FILE_A), unlinkIfExists(getLockPath(FILE_A)), unlinkIfExists(getTmpPath(FILE_A))])
7+
}
8+
9+
beforeEach(cleanup)
10+
afterEach(cleanup)
11+
12+
test('updateJSONFile: single write', async () => {
13+
const cbManipulate = jest.fn(() => {
14+
return {
15+
a: 1,
16+
}
17+
})
18+
await updateJSONFile(FILE_A, cbManipulate)
19+
20+
expect(cbManipulate).toBeCalledTimes(1)
21+
expect(await readIfExists(FILE_A)).toBe(
22+
JSON.stringify({
23+
a: 1,
24+
})
25+
)
26+
})
27+
28+
test('updateJSONFile: 2 writes', async () => {
29+
const cbManipulate = jest.fn((o) => {
30+
o = o || []
31+
o.push('a')
32+
return o
33+
})
34+
35+
const p0 = updateJSONFile(FILE_A, cbManipulate)
36+
await sleep(5)
37+
38+
const p1 = updateJSONFile(FILE_A, cbManipulate)
39+
40+
await Promise.all([p0, p1])
41+
42+
expect(cbManipulate).toBeCalledTimes(2)
43+
expect(await readIfExists(FILE_A)).toBe(JSON.stringify(['a', 'a']))
44+
})
45+
test('updateJSONFile: 10 writes', async () => {
46+
const cbManipulate = jest.fn((o) => {
47+
o = o || []
48+
o.push('b')
49+
return o
50+
})
51+
52+
const config = {
53+
retryTimeout: 30,
54+
retryCount: 3,
55+
}
56+
57+
// This should be an impossible tasks, because there will be too many locks, and not enough time to resolve them:
58+
59+
let error: any
60+
try {
61+
await Promise.all([
62+
updateJSONFile(FILE_A, cbManipulate, config),
63+
updateJSONFile(FILE_A, cbManipulate, config),
64+
updateJSONFile(FILE_A, cbManipulate, config),
65+
updateJSONFile(FILE_A, cbManipulate, config),
66+
updateJSONFile(FILE_A, cbManipulate, config),
67+
updateJSONFile(FILE_A, cbManipulate, config),
68+
updateJSONFile(FILE_A, cbManipulate, config),
69+
updateJSONFile(FILE_A, cbManipulate, config),
70+
updateJSONFile(FILE_A, cbManipulate, config),
71+
updateJSONFile(FILE_A, cbManipulate, config),
72+
])
73+
} catch (e) {
74+
error = e
75+
}
76+
expect(error + '').toMatch(/Failed to lock file/)
77+
78+
// Wait for the lock functions to finish retrying:
79+
await sleep(config.retryTimeout * config.retryCount)
80+
})
81+
82+
test('updateJSONFileBatch: single write', async () => {
83+
const cbManipulate = jest.fn(() => {
84+
return {
85+
b: 1,
86+
}
87+
})
88+
await updateJSONFileBatch(FILE_A, cbManipulate)
89+
90+
expect(cbManipulate).toBeCalledTimes(1)
91+
expect(await readIfExists(FILE_A)).toBe(
92+
JSON.stringify({
93+
b: 1,
94+
})
95+
)
96+
})
97+
98+
test('updateJSONFileBatch: 3 writes', async () => {
99+
const v = await readIfExists(FILE_A)
100+
expect(v).toBe(undefined)
101+
102+
const cbManipulate = jest.fn((o) => {
103+
o = o || []
104+
o.push('a')
105+
return o
106+
})
107+
108+
const p0 = updateJSONFileBatch(FILE_A, cbManipulate)
109+
await sleep(5)
110+
111+
const p1 = updateJSONFileBatch(FILE_A, cbManipulate)
112+
const p2 = updateJSONFileBatch(FILE_A, cbManipulate)
113+
114+
await Promise.all([p0, p1, p2])
115+
116+
expect(cbManipulate).toBeCalledTimes(3)
117+
expect(await readIfExists(FILE_A)).toBe(JSON.stringify(['a', 'a', 'a']))
118+
})
119+
test('updateJSONFileBatch: 20 writes', async () => {
120+
const cbManipulate = jest.fn((o) => {
121+
o = o || []
122+
o.push('a')
123+
return o
124+
})
125+
126+
const config = {
127+
retryTimeout: 30,
128+
retryCount: 3,
129+
}
130+
131+
const ps: Promise<void>[] = []
132+
let expectResult: string[] = []
133+
for (let i = 0; i < 20; i++) {
134+
ps.push(updateJSONFileBatch(FILE_A, cbManipulate, config))
135+
expectResult.push('a')
136+
}
137+
138+
await Promise.all(ps)
139+
140+
expect(cbManipulate).toBeCalledTimes(20)
141+
expect(await readIfExists(FILE_A)).toBe(JSON.stringify(expectResult))
142+
})
143+
144+
async function readIfExists(filePath: string): Promise<string | undefined> {
145+
try {
146+
return await fs.readFile(filePath, 'utf-8')
147+
} catch (e) {
148+
if ((e as any)?.code === 'ENOENT') {
149+
// not found
150+
return undefined
151+
} else throw e
152+
}
153+
}
154+
async function unlinkIfExists(filePath: string): Promise<void> {
155+
try {
156+
await fs.unlink(filePath)
157+
} catch (e) {
158+
if ((e as any)?.code === 'ENOENT') {
159+
// not found, that's okay
160+
} else throw e
161+
}
162+
}
163+
function getLockPath(filePath: string): string {
164+
return filePath + '.lock'
165+
}
166+
function sleep(duration: number): Promise<void> {
167+
return new Promise((r) => setTimeout(r, duration))
168+
}

0 commit comments

Comments
 (0)