Skip to content

Commit 796d0d6

Browse files
authored
refactor listIncompleteUploads to TypeScript (#1228)
1 parent d8473f9 commit 796d0d6

File tree

6 files changed

+231
-190
lines changed

6 files changed

+231
-190
lines changed

src/internal/client.ts

Lines changed: 152 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import * as http from 'node:http'
22
import * as https from 'node:https'
3-
import type * as stream from 'node:stream'
3+
import * as stream from 'node:stream'
44

5+
import * as async from 'async'
56
import { isBrowser } from 'browser-or-node'
67
import _ from 'lodash'
78
import * as qs from 'query-string'
@@ -27,6 +28,7 @@ import {
2728
isValidEndpoint,
2829
isValidObjectName,
2930
isValidPort,
31+
isValidPrefix,
3032
isVirtualHostStyle,
3133
makeDateLong,
3234
sanitizeETag,
@@ -43,7 +45,9 @@ import type {
4345
Binary,
4446
BucketItemFromList,
4547
BucketItemStat,
48+
BucketStream,
4649
GetObjectLegalHoldOptions,
50+
IncompleteUploadedBucketItem,
4751
IRequest,
4852
ObjectLockConfigParam,
4953
ObjectLockInfo,
@@ -59,7 +63,7 @@ import type {
5963
Transport,
6064
VersionIdentificator,
6165
} from './type.ts'
62-
import type { UploadedPart } from './xml-parser.ts'
66+
import type { ListMultipartResult, UploadedPart } from './xml-parser.ts'
6367
import * as xmlParsers from './xml-parser.ts'
6468
import { parseInitiateMultipart, parseObjectLegalHoldConfig } from './xml-parser.ts'
6569

@@ -125,6 +129,11 @@ export interface RemoveOptions {
125129
forceDelete?: boolean
126130
}
127131

132+
type Part = {
133+
part: number
134+
etag: string
135+
}
136+
128137
export class TypedClient {
129138
protected transport: Transport
130139
protected host: string
@@ -329,8 +338,13 @@ export class TypedClient {
329338
* Takes care of constructing virtual-host-style or path-style hostname
330339
*/
331340
protected getRequestOptions(
332-
opts: RequestOption & { region: string },
333-
): IRequest & { host: string; headers: Record<string, string> } {
341+
opts: RequestOption & {
342+
region: string
343+
},
344+
): IRequest & {
345+
host: string
346+
headers: Record<string, string>
347+
} {
334348
const method = opts.method
335349
const region = opts.region
336350
const bucketName = opts.bucketName
@@ -955,6 +969,140 @@ export class TypedClient {
955969

956970
// Calls implemented below are related to multipart.
957971

972+
listIncompleteUploads(
973+
bucket: string,
974+
prefix: string,
975+
recursive: boolean,
976+
): BucketStream<IncompleteUploadedBucketItem> {
977+
if (prefix === undefined) {
978+
prefix = ''
979+
}
980+
if (recursive === undefined) {
981+
recursive = false
982+
}
983+
if (!isValidBucketName(bucket)) {
984+
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucket)
985+
}
986+
if (!isValidPrefix(prefix)) {
987+
throw new errors.InvalidPrefixError(`Invalid prefix : ${prefix}`)
988+
}
989+
if (!isBoolean(recursive)) {
990+
throw new TypeError('recursive should be of type "boolean"')
991+
}
992+
const delimiter = recursive ? '' : '/'
993+
let keyMarker = ''
994+
let uploadIdMarker = ''
995+
const uploads: unknown[] = []
996+
let ended = false
997+
998+
// TODO: refactor this with async/await and `stream.Readable.from`
999+
const readStream = new stream.Readable({ objectMode: true })
1000+
readStream._read = () => {
1001+
// push one upload info per _read()
1002+
if (uploads.length) {
1003+
return readStream.push(uploads.shift())
1004+
}
1005+
if (ended) {
1006+
return readStream.push(null)
1007+
}
1008+
this.listIncompleteUploadsQuery(bucket, prefix, keyMarker, uploadIdMarker, delimiter).then(
1009+
(result) => {
1010+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
1011+
// @ts-ignore
1012+
result.prefixes.forEach((prefix) => uploads.push(prefix))
1013+
async.eachSeries(
1014+
result.uploads,
1015+
(upload, cb) => {
1016+
// for each incomplete upload add the sizes of its uploaded parts
1017+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
1018+
// @ts-ignore
1019+
this.listParts(bucket, upload.key, upload.uploadId).then(
1020+
(parts: Part[]) => {
1021+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
1022+
// @ts-ignore
1023+
upload.size = parts.reduce((acc, item) => acc + item.size, 0)
1024+
uploads.push(upload)
1025+
cb()
1026+
},
1027+
(err: Error) => cb(err),
1028+
)
1029+
},
1030+
(err) => {
1031+
if (err) {
1032+
readStream.emit('error', err)
1033+
return
1034+
}
1035+
if (result.isTruncated) {
1036+
keyMarker = result.nextKeyMarker
1037+
uploadIdMarker = result.nextUploadIdMarker
1038+
} else {
1039+
ended = true
1040+
}
1041+
1042+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
1043+
// @ts-ignore
1044+
readStream._read()
1045+
},
1046+
)
1047+
},
1048+
(e) => {
1049+
readStream.emit('error', e)
1050+
},
1051+
)
1052+
}
1053+
return readStream
1054+
}
1055+
1056+
/**
1057+
* Called by listIncompleteUploads to fetch a batch of incomplete uploads.
1058+
*/
1059+
async listIncompleteUploadsQuery(
1060+
bucketName: string,
1061+
prefix: string,
1062+
keyMarker: string,
1063+
uploadIdMarker: string,
1064+
delimiter: string,
1065+
): Promise<ListMultipartResult> {
1066+
if (!isValidBucketName(bucketName)) {
1067+
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName)
1068+
}
1069+
if (!isString(prefix)) {
1070+
throw new TypeError('prefix should be of type "string"')
1071+
}
1072+
if (!isString(keyMarker)) {
1073+
throw new TypeError('keyMarker should be of type "string"')
1074+
}
1075+
if (!isString(uploadIdMarker)) {
1076+
throw new TypeError('uploadIdMarker should be of type "string"')
1077+
}
1078+
if (!isString(delimiter)) {
1079+
throw new TypeError('delimiter should be of type "string"')
1080+
}
1081+
const queries = []
1082+
queries.push(`prefix=${uriEscape(prefix)}`)
1083+
queries.push(`delimiter=${uriEscape(delimiter)}`)
1084+
1085+
if (keyMarker) {
1086+
queries.push(`key-marker=${uriEscape(keyMarker)}`)
1087+
}
1088+
if (uploadIdMarker) {
1089+
queries.push(`upload-id-marker=${uploadIdMarker}`)
1090+
}
1091+
1092+
const maxUploads = 1000
1093+
queries.push(`max-uploads=${maxUploads}`)
1094+
queries.sort()
1095+
queries.unshift('uploads')
1096+
let query = ''
1097+
if (queries.length > 0) {
1098+
query = `${queries.join('&')}`
1099+
}
1100+
const method = 'GET'
1101+
const res = await this.makeRequestAsync({ method, bucketName, query })
1102+
const body = await readAsString(res)
1103+
return xmlParsers.parseListMultipart(body)
1104+
}
1105+
9581106
/**
9591107
* Initiate a new multipart upload.
9601108
* @internal

src/internal/xml-parser.ts

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,9 @@ export type Multipart = {
147147
storageClass: unknown
148148
initiated: unknown
149149
}>
150-
prefixes: { prefix: string }[]
150+
prefixes: {
151+
prefix: string
152+
}[]
151153
isTruncated: boolean
152154
nextKeyMarker: undefined
153155
nextUploadIdMarker: undefined
@@ -167,7 +169,11 @@ export function parseListParts(xml: string): {
167169
parts: UploadedPart[]
168170
} {
169171
let xmlobj = parseXml(xml)
170-
const result: { isTruncated: boolean; marker: number; parts: UploadedPart[] } = {
172+
const result: {
173+
isTruncated: boolean
174+
marker: number
175+
parts: UploadedPart[]
176+
} = {
171177
isTruncated: false,
172178
parts: [],
173179
marker: 0,
@@ -263,6 +269,72 @@ export function parseTagging(xml: string) {
263269
return result
264270
}
265271

272+
type UploadID = unknown
273+
274+
export type ListMultipartResult = {
275+
uploads: {
276+
key: string
277+
uploadId: UploadID
278+
initiator: unknown
279+
owner: unknown
280+
storageClass: unknown
281+
initiated: unknown
282+
}[]
283+
prefixes: {
284+
prefix: string
285+
}[]
286+
isTruncated: boolean
287+
nextKeyMarker: string
288+
nextUploadIdMarker: string
289+
}
290+
291+
// parse XML response for listing in-progress multipart uploads
292+
export function parseListMultipart(xml: string): ListMultipartResult {
293+
const result: ListMultipartResult = {
294+
prefixes: [],
295+
uploads: [],
296+
isTruncated: false,
297+
nextKeyMarker: '',
298+
nextUploadIdMarker: '',
299+
}
300+
301+
let xmlobj = parseXml(xml)
302+
303+
if (!xmlobj.ListMultipartUploadsResult) {
304+
throw new errors.InvalidXMLError('Missing tag: "ListMultipartUploadsResult"')
305+
}
306+
xmlobj = xmlobj.ListMultipartUploadsResult
307+
if (xmlobj.IsTruncated) {
308+
result.isTruncated = xmlobj.IsTruncated
309+
}
310+
if (xmlobj.NextKeyMarker) {
311+
result.nextKeyMarker = xmlobj.NextKeyMarker
312+
}
313+
if (xmlobj.NextUploadIdMarker) {
314+
result.nextUploadIdMarker = xmlobj.nextUploadIdMarker || ''
315+
}
316+
317+
if (xmlobj.CommonPrefixes) {
318+
toArray(xmlobj.CommonPrefixes).forEach((prefix) => {
319+
// @ts-expect-error index check
320+
result.prefixes.push({ prefix: sanitizeObjectKey(toArray<string>(prefix.Prefix)[0]) })
321+
})
322+
}
323+
324+
if (xmlobj.Upload) {
325+
toArray(xmlobj.Upload).forEach((upload) => {
326+
const key = upload.Key
327+
const uploadId = upload.UploadId
328+
const initiator = { id: upload.Initiator.ID, displayName: upload.Initiator.DisplayName }
329+
const owner = { id: upload.Owner.ID, displayName: upload.Owner.DisplayName }
330+
const storageClass = upload.StorageClass
331+
const initiated = new Date(upload.Initiated)
332+
result.uploads.push({ key, uploadId, initiator, owner, storageClass, initiated })
333+
})
334+
}
335+
return result
336+
}
337+
266338
export function parseObjectLockConfig(xml: string): ObjectLockInfo {
267339
const xmlObj = parseXml(xml)
268340
let lockConfigResult = {} as ObjectLockInfo

src/minio.d.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,6 @@ export class Client extends TypedClient {
230230

231231
listObjectsV2(bucketName: string, prefix?: string, recursive?: boolean, startAfter?: string): BucketStream<BucketItem>
232232

233-
listIncompleteUploads(
234-
bucketName: string,
235-
prefix?: string,
236-
recursive?: boolean,
237-
): BucketStream<IncompleteUploadedBucketItem>
238-
239233
getBucketVersioning(bucketName: string, callback: ResultCallback<VersioningConfig>): void
240234
getBucketVersioning(bucketName: string): Promise<VersioningConfig>
241235

0 commit comments

Comments
 (0)