Skip to content

Commit b918302

Browse files
committed
Fix concurrency issue when JB_PARALLEL_DOWNLOADS=true was set
1 parent 8fc6d55 commit b918302

File tree

9 files changed

+644
-88
lines changed

9 files changed

+644
-88
lines changed

.claude/settings.local.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
"Bash(make:*)",
66
"Bash(git add:*)",
77
"Bash(rg:*)",
8-
"Bash(go test:*)"
8+
"Bash(go test:*)",
9+
"Bash(grep:*)",
10+
"Bash(chmod:*)",
11+
"Bash(rm:*)",
12+
"Bash(SKIP_NETWORK_TESTS=\"\" go test -v ./pkg -run TestParallelGitHubArchiveDownloads -timeout 5m)"
913
],
1014
"deny": []
1115
}

pkg/cache/cache_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func TestGlobalRemoteCaches(t *testing.T) {
203203
}
204204
}
205205

206-
// The TestCheckRemoteCache test has been removed as the CheckRemoteCache function
206+
// The TestCheckRemoteCache test has been removed as the CheckRemoteCache function
207207
// was never used in the application and has been removed.
208208

209209
// TestDuplicateURLDeduplication tests that entries with duplicate URLs are deduplicated
@@ -306,19 +306,19 @@ func TestDuplicateURLDeduplication(t *testing.T) {
306306
if sameURLEntry == nil {
307307
t.Fatalf("Failed to find entry with URL %s", sameURL)
308308
}
309-
309+
310310
if sameURLEntry.Key != entry2.Key {
311311
t.Errorf("Expected key %s from second entry, got %s", entry2.Key, sameURLEntry.Key)
312312
}
313-
313+
314314
if sameURLEntry.Path != entry2.Path {
315315
t.Errorf("Expected path %s from second entry, got %s", entry2.Path, sameURLEntry.Path)
316316
}
317-
317+
318318
if sameURLEntry.Size != entry2.Size {
319319
t.Errorf("Expected size %d from second entry, got %d", entry2.Size, sameURLEntry.Size)
320320
}
321-
321+
322322
// Calculate expected total size: entry2 (replaced entry1) + entry3
323323
expectedTotalSize := entry2.Size + entry3.Size
324324
if index.TotalSize != expectedTotalSize {

pkg/git.go

Lines changed: 199 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"path/filepath"
3333
"regexp"
3434
"strings"
35+
"sync"
3536
"time"
3637

3738
"github.com/fatih/color"
@@ -68,6 +69,20 @@ func getCacheKeyForURL(archiveUrl string) string {
6869
return hex.EncodeToString(urlHash[:16])
6970
}
7071

72+
// getArchiveMutex returns a mutex for the given cache key, creating one if necessary
73+
func getArchiveMutex(cacheKey string) *sync.Mutex {
74+
archiveCacheMutexesLock.Lock()
75+
defer archiveCacheMutexesLock.Unlock()
76+
77+
if mutex, exists := archiveCacheMutexes[cacheKey]; exists {
78+
return mutex
79+
}
80+
81+
mutex := &sync.Mutex{}
82+
archiveCacheMutexes[cacheKey] = mutex
83+
return mutex
84+
}
85+
7186
type GitPackage struct {
7287
Source *deps.Git
7388
}
@@ -84,8 +99,81 @@ var (
8499
GlobalCacheEnabled = true
85100
// DefaultGlobalCacheDir is the default location for the global cache
86101
DefaultGlobalCacheDir = ""
102+
103+
// globalCacheMutex protects concurrent access to the global cache
104+
globalCacheMutex sync.Mutex
105+
// archiveCacheMutexes protects concurrent downloads of the same archive
106+
archiveCacheMutexes = make(map[string]*sync.Mutex)
107+
archiveCacheMutexesLock sync.Mutex
87108
)
88109

110+
// validateGzipFile checks if a file is a valid gzip file by reading through the entire archive
111+
func validateGzipFile(filepath string) error {
112+
file, err := os.Open(filepath)
113+
if err != nil {
114+
return fmt.Errorf("failed to open file: %w", err)
115+
}
116+
defer file.Close()
117+
118+
// Get file info for size
119+
info, err := file.Stat()
120+
if err != nil {
121+
return fmt.Errorf("failed to stat file: %w", err)
122+
}
123+
124+
// Try to read the gzip header
125+
gr, err := gzip.NewReader(file)
126+
if err != nil {
127+
return fmt.Errorf("invalid gzip header: %w", err)
128+
}
129+
defer gr.Close()
130+
131+
// Read through the entire tar archive to ensure it's not corrupted
132+
tr := tar.NewReader(gr)
133+
entriesProcessed := 0
134+
totalSize := int64(0)
135+
136+
for {
137+
header, err := tr.Next()
138+
if err == io.EOF {
139+
// Reached end of archive
140+
break
141+
}
142+
if err != nil {
143+
return fmt.Errorf("corrupted tar entry at position %d: %w", entriesProcessed, err)
144+
}
145+
146+
if header == nil {
147+
continue
148+
}
149+
150+
entriesProcessed++
151+
152+
// For regular files, read through the content to ensure it's not corrupted
153+
if header.Typeflag == tar.TypeReg {
154+
// Use io.CopyN to read the exact file size
155+
n, err := io.CopyN(io.Discard, tr, header.Size)
156+
if err != nil {
157+
return fmt.Errorf("corrupted file content for %s (read %d of %d bytes): %w",
158+
header.Name, n, header.Size, err)
159+
}
160+
totalSize += n
161+
}
162+
}
163+
164+
if entriesProcessed == 0 {
165+
return fmt.Errorf("tar archive is empty")
166+
}
167+
168+
// Archive is valid - we successfully read through all entries
169+
if !GitQuiet {
170+
color.Green("Archive validation passed (size: %d bytes, entries: %d, content size: %d bytes)",
171+
info.Size(), entriesProcessed, totalSize)
172+
}
173+
174+
return nil
175+
}
176+
89177
func downloadGitHubArchive(filepath string, urlStr string) error {
90178
// Check if this is an S3 URL
91179
if s3.IsS3URL(urlStr) {
@@ -151,6 +239,12 @@ func downloadGitHubArchive(filepath string, urlStr string) error {
151239
// Success - proceed with download
152240
defer resp.Body.Close()
153241

242+
// Get ETag for integrity verification if available
243+
etag := resp.Header.Get("ETag")
244+
if etag != "" && !GitQuiet {
245+
color.Cyan("Download ETag: %s", etag)
246+
}
247+
154248
// Create the file
155249
out, err := os.Create(filepath)
156250
if err != nil {
@@ -159,9 +253,13 @@ func downloadGitHubArchive(filepath string, urlStr string) error {
159253
}
160254
defer out.Close()
161255

162-
// Write the body to file
163-
_, err = io.Copy(out, resp.Body)
256+
// Write the body to file with a hash calculator
257+
hasher := sha256.New()
258+
writer := io.MultiWriter(out, hasher)
259+
written, err := io.Copy(writer, resp.Body)
164260
resp.Body.Close()
261+
out.Close() // Close the file explicitly before validation
262+
165263
if err != nil {
166264
os.Remove(filepath) // Clean up partial file
167265
lastErr = err
@@ -171,6 +269,37 @@ func downloadGitHubArchive(filepath string, urlStr string) error {
171269
continue
172270
}
173271

272+
// Calculate file hash
273+
fileHash := fmt.Sprintf("%x", hasher.Sum(nil))
274+
if !GitQuiet {
275+
color.Cyan("Downloaded file SHA256: %s", fileHash)
276+
}
277+
278+
// Verify file size if Content-Length was provided
279+
if contentLength := resp.ContentLength; contentLength > 0 && written != contentLength {
280+
os.Remove(filepath)
281+
lastErr = fmt.Errorf("incomplete download: expected %d bytes, got %d", contentLength, written)
282+
if !GitQuiet {
283+
color.Yellow("Download attempt %d/%d incomplete: %v", attempt, maxRetries, lastErr)
284+
}
285+
continue
286+
}
287+
288+
// Validate it's a valid gzip file by reading through the entire archive
289+
if err := validateGzipFile(filepath); err != nil {
290+
os.Remove(filepath)
291+
// Check if this is an EOF error which might indicate truncation
292+
if strings.Contains(err.Error(), "unexpected EOF") || strings.Contains(err.Error(), "EOF") {
293+
lastErr = fmt.Errorf("archive appears to be truncated or corrupted: %w", err)
294+
} else {
295+
lastErr = fmt.Errorf("invalid archive: %w", err)
296+
}
297+
if !GitQuiet {
298+
color.Yellow("Download attempt %d/%d produced invalid archive: %v", attempt, maxRetries, err)
299+
}
300+
continue
301+
}
302+
174303
// Success!
175304
return nil
176305
} else if resp.StatusCode >= 500 || resp.StatusCode == 429 {
@@ -239,7 +368,15 @@ func getGlobalCacheDir() (string, error) {
239368
// 2. Remote caches (if configured)
240369
// 3. Upstream source (if all else fails)
241370
func ensureArchiveCache(archiveFilepath, archiveUrl string) error {
242-
// Check if file already exists at the destination
371+
// Create a unique key based on the URL
372+
cacheKey := getCacheKeyForURL(archiveUrl)
373+
374+
// Get a mutex for this specific archive to prevent concurrent downloads
375+
archiveMutex := getArchiveMutex(cacheKey)
376+
archiveMutex.Lock()
377+
defer archiveMutex.Unlock()
378+
379+
// Check if file already exists at the destination (double-check after acquiring lock)
243380
if _, err := os.Stat(archiveFilepath); err == nil {
244381
if !GitQuiet {
245382
color.Green("FILE ALREADY EXISTS %s", archiveFilepath)
@@ -249,9 +386,6 @@ func ensureArchiveCache(archiveFilepath, archiveUrl string) error {
249386
return err
250387
}
251388

252-
// Create a unique key based on the URL
253-
cacheKey := getCacheKeyForURL(archiveUrl)
254-
255389
// Step 1: If global cache is enabled, check the global cache
256390
if GlobalCacheEnabled {
257391
globalCacheDir, err := getGlobalCacheDir()
@@ -549,6 +683,10 @@ DownloadToDestination:
549683

550684
// registerInGlobalCacheIndex adds an entry to the global cache index
551685
func registerInGlobalCacheIndex(filePath, url string) {
686+
// Lock for global cache index operations
687+
globalCacheMutex.Lock()
688+
defer globalCacheMutex.Unlock()
689+
552690
// Create a unique key from the URL
553691
cacheKey := getCacheKeyForURL(url)
554692

@@ -783,26 +921,44 @@ func registerInGlobalCacheIndex(filePath, url string) {
783921
func gzipUntar(dst string, r io.Reader, subDir string) error {
784922
gzr, err := gzip.NewReader(r)
785923
if err != nil {
786-
return err
924+
return fmt.Errorf("failed to create gzip reader: %w", err)
787925
}
788926
defer gzr.Close()
789927

790928
subDirWithoutSlash := strings.TrimPrefix(subDir, "/")
791929

792930
tr := tar.NewReader(gzr)
793931

932+
entriesProcessed := 0
933+
bytesProcessed := int64(0)
934+
var lastEntryName string
935+
794936
for {
795937
header, err := tr.Next()
796938
switch {
797939
case err == io.EOF:
940+
if entriesProcessed == 0 {
941+
return fmt.Errorf("tar archive appears to be empty")
942+
}
943+
if !GitQuiet {
944+
color.Green("Successfully extracted %d entries (content size: %d bytes)", entriesProcessed, bytesProcessed)
945+
}
798946
return nil
799947

800948
case err != nil:
801-
return err
949+
// Provide detailed error context
950+
errMsg := fmt.Sprintf("failed to read tar entry #%d", entriesProcessed+1)
951+
if lastEntryName != "" {
952+
errMsg += fmt.Sprintf(" (after '%s')", lastEntryName)
953+
}
954+
errMsg += fmt.Sprintf(", extracted %d bytes of content so far", bytesProcessed)
955+
return fmt.Errorf("%s: %w", errMsg, err)
802956

803957
case header == nil:
804958
continue
805959
}
960+
entriesProcessed++
961+
lastEntryName = header.Name
806962

807963
// strip the two first components of the path
808964
parts := strings.SplitAfterN(header.Name, "/", 2)
@@ -847,9 +1003,16 @@ func gzipUntar(dst string, r io.Reader, subDir string) error {
8471003
}
8481004
defer f.Close()
8491005

850-
// copy over contents
851-
if _, err := io.Copy(f, tr); err != nil {
852-
return err
1006+
// copy over contents and track bytes
1007+
written, err := io.Copy(f, tr)
1008+
if err != nil {
1009+
return fmt.Errorf("failed to extract %s: %w", header.Name, err)
1010+
}
1011+
bytesProcessed += written
1012+
1013+
// Verify we read the expected amount
1014+
if written != header.Size {
1015+
return fmt.Errorf("file %s: size mismatch (expected %d bytes, got %d)", header.Name, header.Size, written)
8531016
}
8541017
return nil
8551018
}()
@@ -928,8 +1091,10 @@ func (p *GitPackage) Install(ctx context.Context, name, dir, version string) (st
9281091
return commitSha, nil
9291092
}
9301093
// Fall back to git clone on error
931-
color.Yellow("archive install failed: %s", err)
932-
color.Yellow("falling back to git clone...")
1094+
if !GitQuiet {
1095+
color.Yellow("archive install failed: %v", err)
1096+
color.Yellow("falling back to git clone...")
1097+
}
9331098
}
9341099

9351100
// Try to use global cache or fall back to git clone
@@ -982,10 +1147,20 @@ func (p *GitPackage) resolveVersionToCommitSHA(ctx context.Context, version stri
9821147

9831148
// extractArchiveToDestination extracts a downloaded archive to the destination path
9841149
func (p *GitPackage) extractArchiveToDestination(archiveFilepath, destPath, commitSha string) (string, error) {
1150+
// Get file info for debugging before opening
1151+
info, err := os.Stat(archiveFilepath)
1152+
if err != nil {
1153+
return "", fmt.Errorf("failed to stat archive file before extraction: %w", err)
1154+
}
1155+
1156+
if !GitQuiet {
1157+
color.Cyan("Extracting archive: %s (size: %d bytes)", archiveFilepath, info.Size())
1158+
}
1159+
9851160
// Open the archive file
9861161
ar, err := os.Open(archiveFilepath)
9871162
if err != nil {
988-
return "", err
1163+
return "", fmt.Errorf("failed to open archive file: %w", err)
9891164
}
9901165
defer ar.Close()
9911166

@@ -997,7 +1172,16 @@ func (p *GitPackage) extractArchiveToDestination(archiveFilepath, destPath, comm
9971172
// Extract the sub-directory (if any) from the archive to the final destination
9981173
err = gzipUntar(destPath, ar, p.Source.Subdir)
9991174
if err != nil {
1000-
return "", err
1175+
// Provide more context about the error
1176+
if strings.Contains(err.Error(), "unexpected EOF") {
1177+
// Re-validate the archive to get more details
1178+
ar.Close()
1179+
if validateErr := validateGzipFile(archiveFilepath); validateErr != nil {
1180+
return "", fmt.Errorf("archive validation failed after extraction error - archive may be corrupted or truncated (size: %d bytes): %w", info.Size(), validateErr)
1181+
}
1182+
return "", fmt.Errorf("extraction failed with unexpected EOF - archive may be truncated (size: %d bytes): %w", info.Size(), err)
1183+
}
1184+
return "", fmt.Errorf("failed to extract archive %s (size: %d bytes): %w", archiveFilepath, info.Size(), err)
10011185
}
10021186

10031187
return commitSha, nil

0 commit comments

Comments
 (0)