Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/cmab/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"sync"

"github.com/google/uuid"
"github.com/optimizely/go-sdk/v2/pkg/cache"
Expand All @@ -31,11 +32,26 @@ import (
"github.com/twmb/murmur3"
)

const (
// NumLockStripes defines the number of mutexes for lock striping to reduce contention
NumLockStripes = 1000
)

// DefaultCmabService implements the CmabService interface
type DefaultCmabService struct {
cmabCache cache.CacheWithRemove
cmabClient Client
logger logging.OptimizelyLogProducer
// Lock striping to prevent race conditions in concurrent CMAB requests
locks [NumLockStripes]sync.Mutex
}

// getLockIndex calculates the lock index for a given user and rule combination
func (s *DefaultCmabService) getLockIndex(userID, ruleID string) int {
// Create a hash of userID + ruleID for consistent lock selection
hasher := murmur3.New32()
_, _ = hasher.Write([]byte(userID + ruleID)) // murmur3 Write never returns an error
return int(hasher.Sum32() % NumLockStripes)
}

// ServiceOptions defines options for creating a CMAB service
Expand Down Expand Up @@ -66,6 +82,11 @@ func (s *DefaultCmabService) GetDecision(
ruleID string,
options *decide.Options,
) (Decision, error) {
// Use lock striping to prevent race conditions in concurrent requests
lockIndex := s.getLockIndex(userContext.ID, ruleID)
s.locks[lockIndex].Lock()
defer s.locks[lockIndex].Unlock()

// Initialize reasons slice for decision
reasons := []string{}

Expand Down
52 changes: 52 additions & 0 deletions pkg/cmab/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/optimizely/go-sdk/v2/pkg/decide"
"github.com/optimizely/go-sdk/v2/pkg/entities"
"github.com/optimizely/go-sdk/v2/pkg/logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/twmb/murmur3"
Expand Down Expand Up @@ -843,3 +844,54 @@ func (s *CmabServiceTestSuite) TestGetDecisionApiError() {
s.mockCache.AssertExpectations(s.T())
s.mockClient.AssertExpectations(s.T())
}


// TestLockStripingDistribution verifies that different user/rule combinations
// use different locks to allow for better concurrency
func TestLockStripingDistribution(t *testing.T) {
service := &DefaultCmabService{}

// Test different combinations to ensure they get different lock indices
testCases := []struct {
userID string
ruleID string
}{
{"user1", "rule1"},
{"user2", "rule1"},
{"user1", "rule2"},
{"user3", "rule3"},
{"user4", "rule4"},
}

lockIndices := make(map[int]bool)
for _, tc := range testCases {
index := service.getLockIndex(tc.userID, tc.ruleID)

// Verify index is within expected range
assert.GreaterOrEqual(t, index, 0, "Lock index should be non-negative")
assert.Less(t, index, NumLockStripes, "Lock index should be less than NumLockStripes")

lockIndices[index] = true
}

// We should have multiple different lock indices (though not necessarily all unique due to hash collisions)
assert.Greater(t, len(lockIndices), 1, "Different user/rule combinations should generally use different locks")
}

// TestSameUserRuleCombinationUsesConsistentLock verifies that the same user/rule combination
// always uses the same lock index
func TestSameUserRuleCombinationUsesConsistentLock(t *testing.T) {
service := &DefaultCmabService{}

userID := "test_user"
ruleID := "test_rule"

// Get lock index multiple times
index1 := service.getLockIndex(userID, ruleID)
index2 := service.getLockIndex(userID, ruleID)
index3 := service.getLockIndex(userID, ruleID)

// All should be the same
assert.Equal(t, index1, index2, "Same user/rule should always use same lock")
assert.Equal(t, index2, index3, "Same user/rule should always use same lock")
}
5 changes: 5 additions & 0 deletions pkg/decision/experiment_bucketer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (s ExperimentBucketerService) GetDecision(decisionContext ExperimentDecisio
experiment := decisionContext.Experiment
reasons := decide.NewDecisionReasons(options)

// Skip CMAB experiments - they should be handled by CMAB service only
if experiment.Cmab != nil {
return experimentDecision, reasons, nil
}

// Audience evaluation using common function
inAudience, audienceReasons := evaluator.CheckIfUserInAudience(experiment, userContext, decisionContext.ProjectConfig, s.audienceTreeEvaluator, options, s.logger)
reasons.Append(audienceReasons)
Expand Down
2 changes: 1 addition & 1 deletion pkg/decision/reasons/reason.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ const (
// OverrideVariationAssignmentFound - A valid override variation was found for the given user and experiment
OverrideVariationAssignmentFound Reason = "Override variation assignment found"
// CmabVariationAssigned is the reason when a variation is assigned by the CMAB service
CmabVariationAssigned Reason = "cmab_variation_assigned"
CmabVariationAssigned Reason = "cmab variation assigned"
)