Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions core/goofys_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,12 @@ func waitFor(t *C, addr string) (err error) {

func (t *GoofysTest) deleteBlobsParallelly(cloud StorageBackend, blobs []string) error {
const concurrency = 10
sem := make(semaphore, concurrency)
sem.P(concurrency)
sem := NewSemaphore(concurrency)
var err error
for _, blobOuter := range blobs {
sem.V(1)
sem.P(1) // Acquire slot
go func(blob string) {
defer sem.P(1)
defer sem.V(1) // Release slot
_, localerr := cloud.DeleteBlob(&DeleteBlobInput{blob})
if localerr != nil && localerr != syscall.ENOENT {
err = localerr
Expand All @@ -145,7 +144,9 @@ func (t *GoofysTest) deleteBlobsParallelly(cloud StorageBackend, blobs []string)
break
}
}
sem.V(concurrency)
// Wait for all goroutines to complete
sem.P(concurrency)
sem.V(concurrency) // Release them back
return err
}

Expand Down Expand Up @@ -373,12 +374,11 @@ func (s *GoofysTest) removeBlob(cloud StorageBackend, t *C, blobPath string) {

func (s *GoofysTest) setupBlobs(cloud StorageBackend, t *C, env map[string]*string) {
const concurrency = 10
throttler := make(semaphore, concurrency)
throttler.P(concurrency)
throttler := NewSemaphore(concurrency)

var globalErr atomic.Value
for path, c := range env {
throttler.V(1)
throttler.P(1) // Acquire slot before spawning goroutine
go func(path string, content *string) {
dir := false
if content == nil {
Expand All @@ -392,7 +392,7 @@ func (s *GoofysTest) setupBlobs(cloud StorageBackend, t *C, env map[string]*stri
content = &path
}
}
defer throttler.P(1)
defer throttler.V(1) // Release slot when goroutine completes
params := &PutBlobInput{
Key: path,
Body: bytes.NewReader([]byte(*content)),
Expand All @@ -410,19 +410,20 @@ func (s *GoofysTest) setupBlobs(cloud StorageBackend, t *C, env map[string]*stri
t.Assert(err, IsNil)
}(path, c)
}
throttler.V(concurrency)
throttler = make(semaphore, concurrency)
// Wait for all goroutines to complete by acquiring all slots
throttler.P(concurrency)
// Release them back
throttler.V(concurrency)
t.Assert(globalErr.Load(), IsNil)

// double check, except on AWS S3, because there we sometimes
// hit 404 NoSuchBucket and there's no way to distinguish that
// from 404 KeyNotFound
if !hasEnv("AWS") {
for path, c := range env {
throttler.V(1)
throttler.P(1) // Acquire slot
go func(path string, content *string) {
defer throttler.P(1)
defer throttler.V(1) // Release slot
params := &HeadBlobInput{Key: path}
res, err := cloud.HeadBlob(params)
if err != nil {
Expand All @@ -442,7 +443,9 @@ func (s *GoofysTest) setupBlobs(cloud StorageBackend, t *C, env map[string]*stri
}
}(path, c)
}
throttler.V(concurrency)
// Wait for all goroutines to complete
throttler.P(concurrency)
throttler.V(concurrency) // Release them back
t.Assert(globalErr.Load(), IsNil)
}
}
Expand Down
34 changes: 23 additions & 11 deletions core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package core

import (
"context"
"fmt"
"strings"
"time"
"unicode"

"golang.org/x/sync/semaphore"
)

var TIME_MAX = time.Unix(1<<63-62135596801, 999999999)
Expand Down Expand Up @@ -170,20 +173,29 @@ func Dup(value []byte) []byte {
return ret
}

type empty struct{}

// TODO(dotslash/khc): Remove this semaphore in favor of
// https://godoc.org/golang.org/x/sync/semaphore
type semaphore chan empty
// Semaphore is a counting semaphore implementation using golang.org/x/sync/semaphore.
// It provides P (wait/acquire) and V (signal/release) operations.
type Semaphore struct {
sem *semaphore.Weighted
}

func (sem semaphore) P(n int) {
for i := 0; i < n; i++ {
sem <- empty{}
// NewSemaphore creates a new semaphore with the given initial count.
// The count represents the number of resources available.
func NewSemaphore(n int) *Semaphore {
return &Semaphore{
sem: semaphore.NewWeighted(int64(n)),
}
}

func (sem semaphore) V(n int) {
for i := 0; i < n; i++ {
<-sem
// P (proberen/wait) acquires n resources from the semaphore, blocking until they are available.
// It panics if the context is canceled.
func (s *Semaphore) P(n int) {
if err := s.sem.Acquire(context.Background(), int64(n)); err != nil {
panic(err)
}
}

// V (verhogen/signal) releases n resources back to the semaphore.
func (s *Semaphore) V(n int) {
s.sem.Release(int64(n))
}