Skip to content

Commit 4e48555

Browse files
authored
refactor: Replace deprecated semaphore with golang.org/x/sync/semaphore (#33)
1 parent 76a33f1 commit 4e48555

File tree

2 files changed

+40
-25
lines changed

2 files changed

+40
-25
lines changed

core/goofys_common_test.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,12 @@ func waitFor(t *C, addr string) (err error) {
129129

130130
func (t *GoofysTest) deleteBlobsParallelly(cloud StorageBackend, blobs []string) error {
131131
const concurrency = 10
132-
sem := make(semaphore, concurrency)
133-
sem.P(concurrency)
132+
sem := NewSemaphore(concurrency)
134133
var err error
135134
for _, blobOuter := range blobs {
136-
sem.V(1)
135+
sem.P(1) // Acquire slot
137136
go func(blob string) {
138-
defer sem.P(1)
137+
defer sem.V(1) // Release slot
139138
_, localerr := cloud.DeleteBlob(&DeleteBlobInput{blob})
140139
if localerr != nil && localerr != syscall.ENOENT {
141140
err = localerr
@@ -145,7 +144,9 @@ func (t *GoofysTest) deleteBlobsParallelly(cloud StorageBackend, blobs []string)
145144
break
146145
}
147146
}
148-
sem.V(concurrency)
147+
// Wait for all goroutines to complete
148+
sem.P(concurrency)
149+
sem.V(concurrency) // Release them back
149150
return err
150151
}
151152

@@ -373,12 +374,11 @@ func (s *GoofysTest) removeBlob(cloud StorageBackend, t *C, blobPath string) {
373374

374375
func (s *GoofysTest) setupBlobs(cloud StorageBackend, t *C, env map[string]*string) {
375376
const concurrency = 10
376-
throttler := make(semaphore, concurrency)
377-
throttler.P(concurrency)
377+
throttler := NewSemaphore(concurrency)
378378

379379
var globalErr atomic.Value
380380
for path, c := range env {
381-
throttler.V(1)
381+
throttler.P(1) // Acquire slot before spawning goroutine
382382
go func(path string, content *string) {
383383
dir := false
384384
if content == nil {
@@ -392,7 +392,7 @@ func (s *GoofysTest) setupBlobs(cloud StorageBackend, t *C, env map[string]*stri
392392
content = &path
393393
}
394394
}
395-
defer throttler.P(1)
395+
defer throttler.V(1) // Release slot when goroutine completes
396396
params := &PutBlobInput{
397397
Key: path,
398398
Body: bytes.NewReader([]byte(*content)),
@@ -410,19 +410,20 @@ func (s *GoofysTest) setupBlobs(cloud StorageBackend, t *C, env map[string]*stri
410410
t.Assert(err, IsNil)
411411
}(path, c)
412412
}
413-
throttler.V(concurrency)
414-
throttler = make(semaphore, concurrency)
413+
// Wait for all goroutines to complete by acquiring all slots
415414
throttler.P(concurrency)
415+
// Release them back
416+
throttler.V(concurrency)
416417
t.Assert(globalErr.Load(), IsNil)
417418

418419
// double check, except on AWS S3, because there we sometimes
419420
// hit 404 NoSuchBucket and there's no way to distinguish that
420421
// from 404 KeyNotFound
421422
if !hasEnv("AWS") {
422423
for path, c := range env {
423-
throttler.V(1)
424+
throttler.P(1) // Acquire slot
424425
go func(path string, content *string) {
425-
defer throttler.P(1)
426+
defer throttler.V(1) // Release slot
426427
params := &HeadBlobInput{Key: path}
427428
res, err := cloud.HeadBlob(params)
428429
if err != nil {
@@ -442,7 +443,9 @@ func (s *GoofysTest) setupBlobs(cloud StorageBackend, t *C, env map[string]*stri
442443
}
443444
}(path, c)
444445
}
445-
throttler.V(concurrency)
446+
// Wait for all goroutines to complete
447+
throttler.P(concurrency)
448+
throttler.V(concurrency) // Release them back
446449
t.Assert(globalErr.Load(), IsNil)
447450
}
448451
}

core/utils.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
package core
1717

1818
import (
19+
"context"
1920
"fmt"
2021
"strings"
2122
"time"
2223
"unicode"
24+
25+
"golang.org/x/sync/semaphore"
2326
)
2427

2528
var TIME_MAX = time.Unix(1<<63-62135596801, 999999999)
@@ -170,20 +173,29 @@ func Dup(value []byte) []byte {
170173
return ret
171174
}
172175

173-
type empty struct{}
174-
175-
// TODO(dotslash/khc): Remove this semaphore in favor of
176-
// https://godoc.org/golang.org/x/sync/semaphore
177-
type semaphore chan empty
176+
// Semaphore is a counting semaphore implementation using golang.org/x/sync/semaphore.
177+
// It provides P (wait/acquire) and V (signal/release) operations.
178+
type Semaphore struct {
179+
sem *semaphore.Weighted
180+
}
178181

179-
func (sem semaphore) P(n int) {
180-
for i := 0; i < n; i++ {
181-
sem <- empty{}
182+
// NewSemaphore creates a new semaphore with the given initial count.
183+
// The count represents the number of resources available.
184+
func NewSemaphore(n int) *Semaphore {
185+
return &Semaphore{
186+
sem: semaphore.NewWeighted(int64(n)),
182187
}
183188
}
184189

185-
func (sem semaphore) V(n int) {
186-
for i := 0; i < n; i++ {
187-
<-sem
190+
// P (proberen/wait) acquires n resources from the semaphore, blocking until they are available.
191+
// It panics if the context is canceled.
192+
func (s *Semaphore) P(n int) {
193+
if err := s.sem.Acquire(context.Background(), int64(n)); err != nil {
194+
panic(err)
188195
}
189196
}
197+
198+
// V (verhogen/signal) releases n resources back to the semaphore.
199+
func (s *Semaphore) V(n int) {
200+
s.sem.Release(int64(n))
201+
}

0 commit comments

Comments
 (0)