Skip to content

Commit 8662246

Browse files
committed
perf: parallel range-GET S3 downloads for large objects
Replace the single GetObject stream in S3.Open with parallel range-GET requests for objects larger than 32 MiB. Workers download chunks concurrently via errgroup and reassemble them in order via io.Pipe. All chunk requests are pinned to the ETag from the initial stat to prevent corruption if the key is overwritten mid-read. An errgroup with derived context ensures all workers are cancelled promptly on any error or early consumer close.
1 parent 702337e commit 8662246

3 files changed

Lines changed: 148 additions & 5 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
go.opentelemetry.io/otel/sdk v1.41.0
2121
go.opentelemetry.io/otel/sdk/metric v1.41.0
2222
golang.org/x/mod v0.33.0
23+
golang.org/x/sync v0.20.0
2324
)
2425

2526
require (
@@ -76,7 +77,6 @@ require (
7677
go.yaml.in/yaml/v3 v3.0.4 // indirect
7778
golang.org/x/crypto v0.48.0 // indirect
7879
golang.org/x/net v0.51.0 // indirect
79-
golang.org/x/sync v0.20.0 // indirect
8080
golang.org/x/sys v0.41.0 // indirect
8181
golang.org/x/text v0.34.0 // indirect
8282
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect

internal/cache/s3.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,13 +263,13 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err
263263
}
264264
}
265265

266-
// Get object
267-
obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, minio.GetObjectOptions{})
266+
// Download object using parallel range-GET for large objects.
267+
reader, err := s.parallelGetReader(ctx, s.config.Bucket, objectName, objInfo.Size, objInfo.ETag)
268268
if err != nil {
269-
return nil, nil, errors.Errorf("failed to get object: %w", err)
269+
return nil, nil, err
270270
}
271271

272-
return &s3Reader{obj: obj}, headers, nil
272+
return reader, headers, nil
273273
}
274274

275275
// refreshExpiration updates the Expires-At metadata on an S3 object using

internal/cache/s3_parallel_get.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/alecthomas/errors"
8+
"github.com/minio/minio-go/v7"
9+
"golang.org/x/sync/errgroup"
10+
)
11+
12+
const (
13+
// s3DownloadChunkSize is the size of each parallel range-GET request.
14+
// 32 MiB matches the gradle-cache-tool's benchmarked default.
15+
s3DownloadChunkSize = 32 << 20
16+
// s3DownloadWorkers is the number of concurrent range-GET requests.
17+
// 8 workers should be enough to saturate the host's network connection.
18+
s3DownloadWorkers = 8
19+
)
20+
21+
// parallelGetReader returns an io.ReadCloser that downloads the S3 object
22+
// using parallel range-GET requests and reassembles chunks in order.
23+
// For objects smaller than one chunk, it falls back to a single GetObject.
24+
// The etag pins all chunk requests to one object revision, preventing
25+
// corruption if the key is overwritten during a large read.
26+
func (s *S3) parallelGetReader(ctx context.Context, bucket, objectName string, size int64, etag string) (io.ReadCloser, error) {
27+
if size <= s3DownloadChunkSize {
28+
// Small object: single stream.
29+
obj, err := s.client.GetObject(ctx, bucket, objectName, minio.GetObjectOptions{})
30+
if err != nil {
31+
return nil, errors.Errorf("failed to get object: %w", err)
32+
}
33+
return &s3Reader{obj: obj}, nil
34+
}
35+
36+
// Large object: parallel range requests reassembled in order via io.Pipe.
37+
// Use a cancellable context so workers stop promptly if the consumer
38+
// disconnects or a write error occurs.
39+
dlCtx, cancel := context.WithCancel(ctx)
40+
pr, pw := io.Pipe()
41+
go func() {
42+
err := s.parallelGet(dlCtx, bucket, objectName, size, etag, pw)
43+
cancel()
44+
pw.CloseWithError(err)
45+
}()
46+
return &cancelReadCloser{ReadCloser: pr, cancel: cancel}, nil
47+
}
48+
49+
// cancelReadCloser wraps an io.ReadCloser and cancels a context on Close,
50+
// ensuring background goroutines are cleaned up when the consumer is done.
51+
type cancelReadCloser struct {
52+
io.ReadCloser
53+
cancel context.CancelFunc
54+
}
55+
56+
func (c *cancelReadCloser) Close() error {
57+
c.cancel()
58+
return errors.Wrap(c.ReadCloser.Close(), "close parallel get reader")
59+
}
60+
61+
// parallelGet downloads an S3 object in parallel chunks and writes them in
62+
// order to w. Each worker downloads its chunk into memory so the TCP
63+
// connection stays active at full speed. Peak memory: numWorkers × chunkSize.
64+
// All chunk requests are pinned to the given etag to ensure consistency.
65+
// An errgroup cancels all workers on the first error from any goroutine.
66+
func (s *S3) parallelGet(ctx context.Context, bucket, objectName string, size int64, etag string, w io.Writer) error {
67+
numChunks := int((size + s3DownloadChunkSize - 1) / s3DownloadChunkSize)
68+
numWorkers := min(s3DownloadWorkers, numChunks)
69+
70+
// One buffered channel per chunk so workers never block after sending.
71+
results := make([]chan []byte, numChunks)
72+
for i := range results {
73+
results[i] = make(chan []byte, 1)
74+
}
75+
76+
// Work queue of chunk indices.
77+
work := make(chan int, numChunks)
78+
for i := range numChunks {
79+
work <- i
80+
}
81+
close(work)
82+
83+
eg, egCtx := errgroup.WithContext(ctx)
84+
85+
// Download workers: fetch chunks concurrently and send data on success,
86+
// or return an error which cancels all other workers via egCtx.
87+
for range numWorkers {
88+
eg.Go(func() error {
89+
for seq := range work {
90+
if egCtx.Err() != nil {
91+
return egCtx.Err()
92+
}
93+
94+
start := int64(seq) * s3DownloadChunkSize
95+
end := min(start+s3DownloadChunkSize-1, size-1)
96+
97+
opts := minio.GetObjectOptions{}
98+
if err := opts.SetRange(start, end); err != nil {
99+
return errors.Errorf("set range %d-%d: %w", start, end, err)
100+
}
101+
// Pin to the object revision from the initial stat to prevent
102+
// reading a mix of old and new data if the key is overwritten.
103+
if err := opts.SetMatchETag(etag); err != nil {
104+
return errors.Errorf("set etag %s: %w", etag, err)
105+
}
106+
107+
obj, err := s.client.GetObject(egCtx, bucket, objectName, opts)
108+
if err != nil {
109+
return errors.Errorf("get range %d-%d: %w", start, end, err)
110+
}
111+
112+
// Drain the body immediately so the TCP connection stays at
113+
// full speed. All workers do this concurrently, saturating
114+
// the available S3 bandwidth.
115+
data, readErr := io.ReadAll(obj)
116+
obj.Close() //nolint:errcheck,gosec
117+
if readErr != nil {
118+
return errors.Wrap(readErr, "read chunk")
119+
}
120+
results[seq] <- data
121+
}
122+
return nil
123+
})
124+
}
125+
126+
// Write chunks in order. Runs in the errgroup so that a write error
127+
// cancels egCtx, which stops download workers promptly.
128+
eg.Go(func() error {
129+
for _, ch := range results {
130+
select {
131+
case data := <-ch:
132+
if _, err := w.Write(data); err != nil {
133+
return errors.Wrap(err, "write chunk")
134+
}
135+
case <-egCtx.Done():
136+
return egCtx.Err()
137+
}
138+
}
139+
return nil
140+
})
141+
142+
return errors.Wrap(eg.Wait(), "parallel get")
143+
}

0 commit comments

Comments
 (0)