Skip to content

Commit 8776cff

Browse files
committed
fix: ensure the same FTP client is not used for download & upload
1 parent 3359e54 commit 8776cff

2 files changed

Lines changed: 82 additions & 28 deletions

File tree

shared/packages/worker/src/worker/accessorHandlers/ftp.ts

Lines changed: 80 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata>
203203
await this.fileHandler.handleRemovePackage(this.filePath, this.packageName, reason)
204204
}
205205
async getPackageReadStream(): Promise<PackageReadStream> {
206+
// important that this is a 'read', so that it doesn't go into a deadlock with putPackageStream() in case of an upload/download to the same accessorPackageContainer
206207
const ftp = await this.prepareFTPClient()
207208

208209
const response = await ftp.download(this.fullPath)
@@ -221,12 +222,13 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata>
221222
}
222223
}
223224
async putPackageStream(sourceStream: NodeJS.ReadableStream): Promise<PutPackageHandler> {
224-
const ftp = await this.prepareFTPClient()
225+
// important that this is a 'write', so that it doesn't go into a deadlock with getPackageReadStream() in case of an upload/download to the same accessorPackageContainer
226+
const ftp = await this.prepareFTPClient('write')
225227

226228
const fullPath = this.workOptions.useTemporaryFilePath ? this.temporaryFilePath : this.fullPath
227229

228230
// Remove the file if it exists:
229-
await this.unlinkIfExists(fullPath)
231+
await ftp.removeFileIfExists(fullPath)
230232

231233
const streamWrapper: PutPackageHandler = new PutPackageHandler(() => {
232234
// abort:
@@ -445,13 +447,22 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata>
445447
}
446448
}
447449

448-
private async prepareFTPClient(): Promise<FTPClientBase> {
449-
const cacheKey = `${this.accessor.serverType}-${this.accessor.host}-${this.accessor.port ?? 21}/${
450-
this.accessor.basePath ?? '/'
451-
}`
452-
450+
private async prepareFTPClient(
451+
/**
452+
* If set, ensures that a cached client is used NOT used for another (set) purpose.
453+
* If undefined, any cached client can be used.
454+
*/
455+
purpose: 'read' | 'write' | undefined = undefined
456+
): Promise<FTPClientBase> {
457+
type CachedClients = {
458+
clients: CachedClient[]
459+
options: FTPOptions
460+
}
461+
type CachedClient = {
462+
client: FTPClientBase
463+
purpose: 'read' | 'write' | undefined
464+
}
453465
const ftpOptions = this.ftpOptions
454-
455466
const options: FTPOptions = {
456467
type: Accessor.AccessType.FTP,
457468
serverType: ftpOptions.serverType,
@@ -462,33 +473,76 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata>
462473
allowAnyCertificate: ftpOptions.allowAnyCertificate,
463474
}
464475

465-
let cachedClient = this.worker.accessorCache[cacheKey] as FTPClientBase | undefined
466-
467-
if (cachedClient?.destroyed) {
468-
delete this.worker.accessorCache[cacheKey]
469-
cachedClient = undefined
476+
const accessorCache = this.worker.accessorCache as {
477+
[accessorType: string]: CachedClients | undefined
470478
}
471-
if (cachedClient) {
479+
480+
const cacheKey = JSON.stringify([
481+
this.accessorId,
482+
ftpOptions.serverType,
483+
ftpOptions.host,
484+
ftpOptions.port,
485+
this.accessor.basePath ?? '/',
486+
])
487+
488+
let cachedClients = accessorCache[cacheKey]
489+
if (cachedClients) {
472490
// Check that options matches:
473-
if (!isEqual(cachedClient.options, options)) {
474-
await cachedClient.destroy()
475-
delete this.worker.accessorCache[cacheKey]
476-
cachedClient = undefined
491+
if (!isEqual(cachedClients.options, options)) {
492+
for (const c of cachedClients.clients) {
493+
await c.client.destroy()
494+
}
495+
cachedClients.clients.splice(0, cachedClients.clients.length) // empty the array
496+
delete accessorCache[cacheKey]
497+
cachedClients = undefined
498+
}
499+
}
500+
501+
if (!cachedClients) {
502+
cachedClients = { clients: [], options }
503+
accessorCache[cacheKey] = cachedClients
504+
}
505+
506+
let cachedClient: CachedClient | undefined
507+
for (const client of cachedClients.clients) {
508+
let useThisClient: boolean
509+
510+
// If no purpose is set, we can use it for anything:
511+
if (client.purpose === undefined) useThisClient = true
512+
// If we don't have a purpose set, we can use any client:
513+
else if (purpose === undefined) useThisClient = true
514+
// If we have a matching purpose, we can use it:
515+
else if (purpose === client.purpose) useThisClient = true
516+
else useThisClient = false
517+
518+
if (useThisClient) {
519+
cachedClient = client
520+
break
477521
}
478522
}
523+
524+
if (cachedClient?.client.destroyed) {
525+
cachedClients.clients = cachedClients.clients.filter((c) => c !== cachedClient) // remove the client
526+
cachedClient = undefined
527+
}
528+
479529
if (!cachedClient) {
480530
// Set up a new FTP client:
481-
cachedClient = createFTPClient(ftpOptions.serverType, this.worker.logger, options)
531+
cachedClient = {
532+
client: createFTPClient(ftpOptions.serverType, this.worker.logger, options),
533+
purpose: purpose,
534+
}
535+
cachedClients.clients.push(cachedClient)
482536
}
483537

484-
if (cachedClient) {
485-
await cachedClient.init()
486-
487-
this.worker.accessorCache[cacheKey] = cachedClient
488-
return cachedClient
489-
} else {
490-
throw new Error(`FTPAccessorHandle: Could not create FTP client for ${ftpOptions.serverType}`)
538+
if (purpose && cachedClient.purpose === undefined) {
539+
// If we're using a generic client but for a specific purpose, set that purpose:
540+
cachedClient.purpose = purpose
491541
}
542+
543+
await cachedClient.client.init()
544+
545+
return cachedClient.client
492546
}
493547
/** Full path to the metadata file */
494548
private getMetadataPath(fullUrl: string) {

shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ export class FTPClient extends FTPClientBase {
1717
constructor(logger: LoggerInstance, options: FTPOptions) {
1818
super(options)
1919
this.logger = logger.category('FTPClient')
20-
if (options.serverType !== 'ftp' && options.serverType !== 'ftps')
21-
throw new Error('Internal Error: serverType must be "ftp" or "ftps"')
20+
if (!['ftp', 'ftps', 'ftp-ssl'].includes(options.serverType))
21+
throw new Error('Internal Error: serverType must be "ftp", "ftps" or "ftp-ssl"')
2222

2323
this.client = this.createFTPClient()
2424
}

0 commit comments

Comments
 (0)