Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 125 additions & 5 deletions api-get-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"

"github.com/minio/minio-go/v7/pkg/s3utils"
"golang.org/x/sync/errgroup"
)

// GetObject wrapper function that accepts a request context
Expand Down Expand Up @@ -81,6 +82,8 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
// Used to verify if etag of object has changed since last read.
var etag string

useParallel := opts.ParallelWorkers > 1

for req := range reqCh {
// If this is the first request we may not need to do a getObject request yet.
if req.isFirstReq {
Expand All @@ -96,12 +99,45 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
} else if req.Offset > 0 {
opts.SetRange(req.Offset, 0)
}
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{Error: err}
return

// When parallel is configured and this is a forward sequential
// read from the start, use parallel range-GET.
if useParallel && !req.isReadAt && req.Offset == 0 {
// Stat to learn size and ETag for parallel download.
delete(opts.headers, "Range")
objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
if err != nil {
resCh <- getResponse{Error: err}
return
}
etag = objectInfo.ETag

chunkSize := parallelGetChunkSize(opts)
if objectInfo.Size > chunkSize {
// Large object: start parallel download into a pipe.
pr, pw := io.Pipe()
go func() {
dlErr := c.parallelGet(gctx, bucketName, objectName, objectInfo.Size, etag, opts, pw)
pw.CloseWithError(dlErr)
}()
httpReader = pr
} else {
// Small object: single stream.
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{Error: err}
return
}
}
} else {
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{Error: err}
return
}
etag = objectInfo.ETag
}
etag = objectInfo.ETag

// Read at least firstReq.Buffer bytes, if not we have
// reached our EOF.
size, err := readFull(httpReader, req.Buffer)
Expand Down Expand Up @@ -704,3 +740,87 @@ func (c *Client) getObject(ctx context.Context, bucketName, objectName string, o
// do not close body here, caller will close
return resp.Body, objectStat, resp.Header, nil
}

func parallelGetChunkSize(opts GetObjectOptions) int64 {
if opts.ParallelChunkSize > 0 {
return opts.ParallelChunkSize
}
return minPartSize
}

// parallelGet downloads an S3 object in parallel chunks and writes them in
// order to w. Each worker downloads its chunk into memory so the TCP
// connection stays active at full speed. Peak memory: numWorkers × chunkSize.
// All chunk requests are pinned to the given ETag to ensure consistency.
func (c *Client) parallelGet(ctx context.Context, bucketName, objectName string, size int64, etag string, opts GetObjectOptions, w io.Writer) error {
chunkSize := parallelGetChunkSize(opts)
numChunks := int((size + chunkSize - 1) / chunkSize)
numWorkers := min(opts.ParallelWorkers, numChunks)

// One buffered channel per chunk preserves ordering.
results := make([]chan []byte, numChunks)
for i := range results {
results[i] = make(chan []byte, 1)
}

// Work queue of chunk indices.
work := make(chan int, numChunks)
for i := range numChunks {
work <- i
}
close(work)

eg, egCtx := errgroup.WithContext(ctx)

// Download workers.
for range numWorkers {
eg.Go(func() error {
for seq := range work {
if egCtx.Err() != nil {
return egCtx.Err()
}

start := int64(seq) * chunkSize
end := min(start+chunkSize-1, size-1)

getOpts := opts
if err := getOpts.SetRange(start, end); err != nil {
return fmt.Errorf("set range %d-%d: %w", start, end, err)
}
if err := getOpts.SetMatchETag(etag); err != nil {
return fmt.Errorf("set etag %s: %w", etag, err)
}

body, _, _, err := c.getObject(egCtx, bucketName, objectName, getOpts)
if err != nil {
return fmt.Errorf("get range %d-%d: %w", start, end, err)
}

data, readErr := io.ReadAll(body)
body.Close()
if readErr != nil {
return fmt.Errorf("read chunk %d: %w", seq, readErr)
}
results[seq] <- data
}
return nil
})
}

// Writer goroutine reassembles chunks in order.
eg.Go(func() error {
for _, ch := range results {
select {
case data := <-ch:
if _, err := w.Write(data); err != nil {
return fmt.Errorf("write chunk: %w", err)
}
case <-egCtx.Done():
return egCtx.Err()
}
}
return nil
})

return eg.Wait()
}
Loading