Skip to content
156 changes: 152 additions & 4 deletions src/internal/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as http from 'node:http'
import * as https from 'node:https'
import type * as stream from 'node:stream'
import * as stream from 'node:stream'

import * as async from 'async'
import { isBrowser } from 'browser-or-node'
import _ from 'lodash'
import * as qs from 'query-string'
Expand All @@ -27,6 +28,7 @@ import {
isValidEndpoint,
isValidObjectName,
isValidPort,
isValidPrefix,
isVirtualHostStyle,
makeDateLong,
sanitizeETag,
Expand All @@ -43,7 +45,9 @@ import type {
Binary,
BucketItemFromList,
BucketItemStat,
BucketStream,
GetObjectLegalHoldOptions,
IncompleteUploadedBucketItem,
IRequest,
ObjectLockConfigParam,
ObjectLockInfo,
Expand All @@ -59,7 +63,7 @@ import type {
Transport,
VersionIdentificator,
} from './type.ts'
import type { UploadedPart } from './xml-parser.ts'
import type { ListMultipartResult, UploadedPart } from './xml-parser.ts'
import * as xmlParsers from './xml-parser.ts'
import { parseInitiateMultipart, parseObjectLegalHoldConfig } from './xml-parser.ts'

Expand Down Expand Up @@ -125,6 +129,11 @@ export interface RemoveOptions {
forceDelete?: boolean
}

type Part = {
part: number
etag: string
}

export class TypedClient {
protected transport: Transport
protected host: string
Expand Down Expand Up @@ -329,8 +338,13 @@ export class TypedClient {
* Takes care of constructing virtual-host-style or path-style hostname
*/
protected getRequestOptions(
opts: RequestOption & { region: string },
): IRequest & { host: string; headers: Record<string, string> } {
opts: RequestOption & {
region: string
},
): IRequest & {
host: string
headers: Record<string, string>
} {
const method = opts.method
const region = opts.region
const bucketName = opts.bucketName
Expand Down Expand Up @@ -955,6 +969,140 @@ export class TypedClient {

// Calls implemented below are related to multipart.

listIncompleteUploads(
bucket: string,
prefix: string,
recursive: boolean,
): BucketStream<IncompleteUploadedBucketItem> {
if (prefix === undefined) {
prefix = ''
}
if (recursive === undefined) {
recursive = false
}
if (!isValidBucketName(bucket)) {
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucket)
}
if (!isValidPrefix(prefix)) {
throw new errors.InvalidPrefixError(`Invalid prefix : ${prefix}`)
}
if (!isBoolean(recursive)) {
throw new TypeError('recursive should be of type "boolean"')
}
const delimiter = recursive ? '' : '/'
let keyMarker = ''
let uploadIdMarker = ''
const uploads: unknown[] = []
let ended = false

// TODO: refactor this with async/await and `stream.Readable.from`
const readStream = new stream.Readable({ objectMode: true })
readStream._read = () => {
// push one upload info per _read()
if (uploads.length) {
return readStream.push(uploads.shift())
}
if (ended) {
return readStream.push(null)
}
this.listIncompleteUploadsQuery(bucket, prefix, keyMarker, uploadIdMarker, delimiter).then(
(result) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
result.prefixes.forEach((prefix) => uploads.push(prefix))
async.eachSeries(
result.uploads,
(upload, cb) => {
// for each incomplete upload add the sizes of its uploaded parts
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
this.listParts(bucket, upload.key, upload.uploadId).then(
(parts: Part[]) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
upload.size = parts.reduce((acc, item) => acc + item.size, 0)
uploads.push(upload)
cb()
},
(err: Error) => cb(err),
)
},
(err) => {
if (err) {
readStream.emit('error', err)
return
}
if (result.isTruncated) {
keyMarker = result.nextKeyMarker
uploadIdMarker = result.nextUploadIdMarker
} else {
ended = true
}

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
readStream._read()
},
)
},
(e) => {
readStream.emit('error', e)
},
)
}
return readStream
}

/**
* Called by listIncompleteUploads to fetch a batch of incomplete uploads.
*/
async listIncompleteUploadsQuery(
bucketName: string,
prefix: string,
keyMarker: string,
uploadIdMarker: string,
delimiter: string,
): Promise<ListMultipartResult> {
if (!isValidBucketName(bucketName)) {
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName)
}
if (!isString(prefix)) {
throw new TypeError('prefix should be of type "string"')
}
if (!isString(keyMarker)) {
throw new TypeError('keyMarker should be of type "string"')
}
if (!isString(uploadIdMarker)) {
throw new TypeError('uploadIdMarker should be of type "string"')
}
if (!isString(delimiter)) {
throw new TypeError('delimiter should be of type "string"')
}
const queries = []
queries.push(`prefix=${uriEscape(prefix)}`)
queries.push(`delimiter=${uriEscape(delimiter)}`)

if (keyMarker) {
queries.push(`key-marker=${uriEscape(keyMarker)}`)
}
if (uploadIdMarker) {
queries.push(`upload-id-marker=${uploadIdMarker}`)
}

const maxUploads = 1000
queries.push(`max-uploads=${maxUploads}`)
queries.sort()
queries.unshift('uploads')
let query = ''
if (queries.length > 0) {
query = `${queries.join('&')}`
}
const method = 'GET'
const res = await this.makeRequestAsync({ method, bucketName, query })
const body = await readAsString(res)
return xmlParsers.parseListMultipart(body)
}

/**
* Initiate a new multipart upload.
* @internal
Expand Down
76 changes: 74 additions & 2 deletions src/internal/xml-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ export type Multipart = {
storageClass: unknown
initiated: unknown
}>
prefixes: { prefix: string }[]
prefixes: {
prefix: string
}[]
isTruncated: boolean
nextKeyMarker: undefined
nextUploadIdMarker: undefined
Expand All @@ -167,7 +169,11 @@ export function parseListParts(xml: string): {
parts: UploadedPart[]
} {
let xmlobj = parseXml(xml)
const result: { isTruncated: boolean; marker: number; parts: UploadedPart[] } = {
const result: {
isTruncated: boolean
marker: number
parts: UploadedPart[]
} = {
isTruncated: false,
parts: [],
marker: 0,
Expand Down Expand Up @@ -263,6 +269,72 @@ export function parseTagging(xml: string) {
return result
}

type UploadID = unknown

export type ListMultipartResult = {
uploads: {
key: string
uploadId: UploadID
initiator: unknown
owner: unknown
storageClass: unknown
initiated: unknown
}[]
prefixes: {
prefix: string
}[]
isTruncated: boolean
nextKeyMarker: string
nextUploadIdMarker: string
}

// parse XML response for listing in-progress multipart uploads
export function parseListMultipart(xml: string): ListMultipartResult {
const result: ListMultipartResult = {
prefixes: [],
uploads: [],
isTruncated: false,
nextKeyMarker: '',
nextUploadIdMarker: '',
}

let xmlobj = parseXml(xml)

if (!xmlobj.ListMultipartUploadsResult) {
throw new errors.InvalidXMLError('Missing tag: "ListMultipartUploadsResult"')
}
xmlobj = xmlobj.ListMultipartUploadsResult
if (xmlobj.IsTruncated) {
result.isTruncated = xmlobj.IsTruncated
}
if (xmlobj.NextKeyMarker) {
result.nextKeyMarker = xmlobj.NextKeyMarker
}
if (xmlobj.NextUploadIdMarker) {
result.nextUploadIdMarker = xmlobj.nextUploadIdMarker || ''
}

if (xmlobj.CommonPrefixes) {
toArray(xmlobj.CommonPrefixes).forEach((prefix) => {
// @ts-expect-error index check
result.prefixes.push({ prefix: sanitizeObjectKey(toArray<string>(prefix.Prefix)[0]) })
})
}

if (xmlobj.Upload) {
toArray(xmlobj.Upload).forEach((upload) => {
const key = upload.Key
const uploadId = upload.UploadId
const initiator = { id: upload.Initiator.ID, displayName: upload.Initiator.DisplayName }
const owner = { id: upload.Owner.ID, displayName: upload.Owner.DisplayName }
const storageClass = upload.StorageClass
const initiated = new Date(upload.Initiated)
result.uploads.push({ key, uploadId, initiator, owner, storageClass, initiated })
})
}
return result
}

export function parseObjectLockConfig(xml: string): ObjectLockInfo {
const xmlObj = parseXml(xml)
let lockConfigResult = {} as ObjectLockInfo
Expand Down
6 changes: 0 additions & 6 deletions src/minio.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,6 @@ export class Client extends TypedClient {

listObjectsV2(bucketName: string, prefix?: string, recursive?: boolean, startAfter?: string): BucketStream<BucketItem>

listIncompleteUploads(
bucketName: string,
prefix?: string,
recursive?: boolean,
): BucketStream<IncompleteUploadedBucketItem>

getBucketVersioning(bucketName: string, callback: ResultCallback<VersioningConfig>): void
getBucketVersioning(bucketName: string): Promise<VersioningConfig>

Expand Down
Loading