Skip to content

Commit fa0e6b8

Browse files
committed
Add concurrent proccesing nested dependencies. Also go fmt
1 parent 8f13987 commit fa0e6b8

File tree

9 files changed

+133
-107
lines changed

9 files changed

+133
-107
lines changed

.claude/settings.local.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
"Bash(grep:*)",
1010
"Bash(chmod:*)",
1111
"Bash(rm:*)",
12-
"Bash(SKIP_NETWORK_TESTS=\"\" go test -v ./pkg -run TestParallelGitHubArchiveDownloads -timeout 5m)"
12+
"Bash(SKIP_NETWORK_TESTS=\"\" go test -v ./pkg -run TestParallelGitHubArchiveDownloads -timeout 5m)",
13+
"Bash(JB_PARALLEL_DOWNLOADS=true make test-integration 2 >& 1)",
14+
"Bash(go fmt:*)",
15+
"Bash(go vet:*)",
16+
"Bash(go run:*)"
1317
],
1418
"deny": []
1519
}

pkg/git_install_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ func TestApplySparseCheckout(t *testing.T) {
363363
}
364364

365365
// Apply sparse checkout
366-
err = gitPkg.applySparseCheckout(ctx, tempDir, "main")
366+
_ = gitPkg.applySparseCheckout(ctx, tempDir, "main")
367367

368368
// This will fail because there's no actual checkout, but we can verify the config was set
369369
sparseCheckoutFile := filepath.Join(tempDir, ".git", "info", "sparse-checkout")

pkg/packages.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,15 @@ func linkLegacy(vendorDir string, locks *deps.Ordered) error {
175175
continue
176176
}
177177

178+
// ensure parent directory exists
179+
if err := os.MkdirAll(filepath.Dir(legacyName), os.ModePerm); err != nil {
180+
return err
181+
}
182+
178183
// create the symlink
179184
if err := os.Symlink(
180-
filepath.Join(pkgName),
181-
filepath.Join(legacyName),
185+
pkgName,
186+
legacyName,
182187
); err != nil {
183188
return err
184189
}

pkg/packages_parallel.go

Lines changed: 115 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -30,149 +30,166 @@ import (
3030
// parallelEnsure is a parallel version of ensure that downloads multiple packages concurrently
3131
func parallelEnsure(direct *deps.Ordered, vendorDir, pathToParentModule string, locks *deps.Ordered) (*deps.Ordered, error) {
3232
resultDeps := deps.NewOrdered()
33+
depsMutex := &sync.Mutex{}
34+
locksMutex := &sync.Mutex{}
3335

34-
// First, identify packages that need to be downloaded
35-
type downloadTask struct {
36-
key string
37-
dep deps.Dependency
38-
locked deps.Dependency
39-
hasLock bool
40-
needsDownload bool
41-
}
36+
// Configure concurrency limits
37+
maxConcurrentDownloads := 10 // Increased from 5 for better parallelism
38+
maxConcurrentNested := 5 // Process nested dependencies concurrently
39+
40+
// Create error group for better error handling
41+
var firstErr error
42+
var errOnce sync.Once
4243

43-
var tasks []downloadTask
44-
var mu sync.Mutex
44+
// Phase 1: Download all direct dependencies in parallel
45+
downloadSem := make(chan struct{}, maxConcurrentDownloads)
46+
var downloadWg sync.WaitGroup
4547

46-
// Prepare download tasks
4748
for _, k := range direct.Keys() {
4849
d, _ := direct.Get(k)
4950
l, present := locks.Get(d.Name())
5051

51-
task := downloadTask{
52-
key: k,
53-
dep: d,
54-
locked: l,
55-
hasLock: present,
56-
needsDownload: true,
57-
}
58-
59-
// already locked and the integrity is intact
52+
// Check if already locked and intact
6053
if present {
6154
d.Version = l.Version
62-
6355
if check(l, vendorDir) {
64-
task.needsDownload = false
56+
depsMutex.Lock()
6557
resultDeps.Set(d.Name(), l)
58+
depsMutex.Unlock()
59+
continue
6660
}
6761
}
6862

69-
tasks = append(tasks, task)
70-
}
71-
72-
// Download packages in parallel
73-
type downloadResult struct {
74-
key string
75-
dependency *deps.Dependency
76-
err error
77-
}
78-
79-
resultChan := make(chan downloadResult, len(tasks))
80-
var wg sync.WaitGroup
81-
82-
// Limit concurrent downloads to avoid overwhelming the system
83-
semaphore := make(chan struct{}, 5) // Max 5 concurrent downloads
84-
85-
for _, task := range tasks {
86-
if !task.needsDownload {
87-
continue
88-
}
89-
90-
wg.Add(1)
91-
go func(t downloadTask) {
92-
defer wg.Done()
63+
downloadWg.Add(1)
64+
go func(dep deps.Dependency, locked deps.Dependency, hasLock bool) {
65+
defer downloadWg.Done()
9366

94-
semaphore <- struct{}{} // Acquire
95-
defer func() { <-semaphore }() // Release
67+
downloadSem <- struct{}{} // Acquire
68+
defer func() { <-downloadSem }() // Release
9669

9770
// Remove existing directory
98-
dir := filepath.Join(vendorDir, t.dep.Name())
71+
dir := filepath.Join(vendorDir, dep.Name())
9972
os.RemoveAll(dir)
10073

10174
// Download the package
102-
locked, err := download(t.dep, vendorDir, pathToParentModule)
75+
downloaded, err := download(dep, vendorDir, pathToParentModule)
10376
if err != nil {
104-
resultChan <- downloadResult{key: t.key, dependency: nil, err: errors.Wrap(err, "downloading")}
77+
errOnce.Do(func() {
78+
firstErr = errors.Wrap(err, "downloading "+dep.Name())
79+
})
10580
return
10681
}
10782

10883
// Check sum if expected
109-
if t.hasLock && t.locked.Sum != "" && locked.Sum != t.locked.Sum {
110-
resultChan <- downloadResult{
111-
key: t.key,
112-
dependency: nil,
113-
err: fmt.Errorf("checksum mismatch for %s. Expected %s but got %s", t.dep.Name(), t.locked.Sum, locked.Sum),
114-
}
84+
if hasLock && locked.Sum != "" && downloaded.Sum != locked.Sum {
85+
errOnce.Do(func() {
86+
firstErr = fmt.Errorf("checksum mismatch for %s. Expected %s but got %s", dep.Name(), locked.Sum, downloaded.Sum)
87+
})
11588
return
11689
}
11790

118-
resultChan <- downloadResult{key: t.key, dependency: locked, err: nil}
119-
}(task)
91+
// Update results
92+
depsMutex.Lock()
93+
resultDeps.Set(downloaded.Name(), *downloaded)
94+
depsMutex.Unlock()
95+
96+
locksMutex.Lock()
97+
locks.Set(downloaded.Name(), *downloaded)
98+
locksMutex.Unlock()
99+
}(d, l, present)
120100
}
121101

122-
// Close result channel when all downloads are done
123-
go func() {
124-
wg.Wait()
125-
close(resultChan)
126-
}()
102+
downloadWg.Wait()
103+
if firstErr != nil {
104+
return nil, firstErr
105+
}
127106

128-
// Collect results
129-
for result := range resultChan {
130-
if result.err != nil {
131-
return nil, result.err
132-
}
107+
// Phase 2: Process nested dependencies concurrently
108+
type nestedTask struct {
109+
dep deps.Dependency
110+
vendorPath string
111+
absolutePath string
112+
}
133113

134-
if result.dependency != nil {
135-
mu.Lock()
136-
resultDeps.Set(result.dependency.Name(), *result.dependency)
137-
// we settled on a new version, add it to the locks for recursion
138-
locks.Set(result.dependency.Name(), *result.dependency)
139-
mu.Unlock()
140-
}
114+
nestedChan := make(chan nestedTask, 100)
115+
nestedSem := make(chan struct{}, maxConcurrentNested)
116+
var nestedWg sync.WaitGroup
117+
118+
// Worker pool for processing nested dependencies
119+
for i := 0; i < maxConcurrentNested; i++ {
120+
nestedWg.Add(1)
121+
go func() {
122+
defer nestedWg.Done()
123+
for task := range nestedChan {
124+
func() {
125+
nestedSem <- struct{}{} // Acquire
126+
defer func() { <-nestedSem }() // Release
127+
128+
f, err := jsonnetfile.Load(filepath.Join(task.vendorPath, jsonnetfile.File))
129+
if err != nil {
130+
if !os.IsNotExist(err) {
131+
errOnce.Do(func() {
132+
firstErr = err
133+
})
134+
}
135+
return
136+
}
137+
138+
// Recursively process nested dependencies
139+
nested, err := parallelEnsure(f.Dependencies, vendorDir, task.absolutePath, locks)
140+
if err != nil {
141+
errOnce.Do(func() {
142+
firstErr = err
143+
})
144+
return
145+
}
146+
147+
// Merge nested dependencies
148+
depsMutex.Lock()
149+
for _, k := range nested.Keys() {
150+
d, _ := nested.Get(k)
151+
if _, exists := resultDeps.Get(d.Name()); !exists {
152+
resultDeps.Set(d.Name(), d)
153+
}
154+
}
155+
depsMutex.Unlock()
156+
}()
157+
}
158+
}()
141159
}
142160

143-
// Process nested dependencies (still sequential for now, could be parallelized too)
161+
// Queue nested dependency tasks
144162
for _, k := range resultDeps.Keys() {
145163
d, _ := resultDeps.Get(k)
146164
if d.Single {
147-
// skip dependencies that explicitly don't want nested ones installed
148-
continue
165+
continue // Skip dependencies that don't want nested ones
149166
}
150167

151-
f, err := jsonnetfile.Load(filepath.Join(vendorDir, d.Name(), jsonnetfile.File))
152-
if err != nil {
153-
if os.IsNotExist(err) {
154-
continue
168+
vendorPath := filepath.Join(vendorDir, d.Name())
169+
170+
// Check if the path exists before evaluating symlinks
171+
absolutePath := vendorPath
172+
if _, err := os.Stat(vendorPath); err == nil {
173+
// Path exists, try to resolve symlinks
174+
resolvedPath, err := filepath.EvalSymlinks(vendorPath)
175+
if err == nil {
176+
absolutePath = resolvedPath
155177
}
156-
return nil, err
178+
// If EvalSymlinks fails, just use the original path
157179
}
158180

159-
absolutePath, err := filepath.EvalSymlinks(filepath.Join(vendorDir, d.Name()))
160-
if err != nil {
161-
return nil, err
181+
nestedChan <- nestedTask{
182+
dep: d,
183+
vendorPath: vendorPath,
184+
absolutePath: absolutePath,
162185
}
186+
}
163187

164-
// Recursively process nested dependencies (could use parallelEnsure here too)
165-
nested, err := parallelEnsure(f.Dependencies, vendorDir, absolutePath, locks)
166-
if err != nil {
167-
return nil, err
168-
}
188+
close(nestedChan)
189+
nestedWg.Wait()
169190

170-
for _, k := range nested.Keys() {
171-
d, _ := nested.Get(k)
172-
if _, ok := resultDeps.Get(d.Name()); !ok {
173-
resultDeps.Set(d.Name(), d)
174-
}
175-
}
191+
if firstErr != nil {
192+
return nil, firstErr
176193
}
177194

178195
return resultDeps, nil

spec/v1/deps/dependencies.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// you may not use this file except in compliance with the License.
55
// You may obtain a copy of the License at
66
//
7-
// http://www.apache.org/licenses/LICENSE-2.0
7+
// http://www.apache.org/licenses/LICENSE-2.0
88
//
99
// Unless required by applicable law or agreed to in writing, software
1010
// distributed under the License is distributed on an "AS IS" BASIS,

spec/v1/deps/dependencies_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// you may not use this file except in compliance with the License.
55
// You may obtain a copy of the License at
66
//
7-
// http://www.apache.org/licenses/LICENSE-2.0
7+
// http://www.apache.org/licenses/LICENSE-2.0
88
//
99
// Unless required by applicable law or agreed to in writing, software
1010
// distributed under the License is distributed on an "AS IS" BASIS,

spec/v1/deps/git.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// you may not use this file except in compliance with the License.
55
// You may obtain a copy of the License at
66
//
7-
// http://www.apache.org/licenses/LICENSE-2.0
7+
// http://www.apache.org/licenses/LICENSE-2.0
88
//
99
// Unless required by applicable law or agreed to in writing, software
1010
// distributed under the License is distributed on an "AS IS" BASIS,

spec/v1/deps/git_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// you may not use this file except in compliance with the License.
55
// You may obtain a copy of the License at
66
//
7-
// http://www.apache.org/licenses/LICENSE-2.0
7+
// http://www.apache.org/licenses/LICENSE-2.0
88
//
99
// Unless required by applicable law or agreed to in writing, software
1010
// distributed under the License is distributed on an "AS IS" BASIS,

spec/v1/v0.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// you may not use this file except in compliance with the License.
55
// You may obtain a copy of the License at
66
//
7-
// http://www.apache.org/licenses/LICENSE-2.0
7+
// http://www.apache.org/licenses/LICENSE-2.0
88
//
99
// Unless required by applicable law or agreed to in writing, software
1010
// distributed under the License is distributed on an "AS IS" BASIS,

0 commit comments

Comments
 (0)