Skip to content

Commit b96901b

Browse files
committed
Refactor OnChange to be set after initialization through a seperate
method. Even with the channel providing guardrails on initialization, the previous attempt would still cause spurious panic's with newPM being nil by the time we attempt to call the method on it. I didn't take a look at the generated assembly but I assume that even with the channel block there was still a race as to when the address of newPM was copied from the outer scope's stack into the closure's stack. This could likely be avoided by taking the address of the pointer and passing that instead, but started to feel too clever to me.
1 parent 36864fa commit b96901b

File tree

5 files changed

+17
-12
lines changed

5 files changed

+17
-12
lines changed

service/matching/matching_engine.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -430,16 +430,10 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager(
430430
tqConfig.loadCause = loadCause
431431
logger, throttledLogger, metricsHandler := e.loggerAndMetricsForPartition(namespaceEntry, partition, tqConfig)
432432
onFatalErr := func(cause unloadCause) { newPM.unloadFromEngine(cause) }
433-
done := make(chan struct{})
434-
onUserDataChanged := func(from, to *persistencespb.VersionedTaskQueueUserData) {
435-
<-done
436-
newPM.userDataChanged(from, to)
437-
}
438433
userDataManager := newUserDataManager(
439434
e.taskManager,
440435
e.matchingRawClient,
441436
onFatalErr,
442-
onUserDataChanged,
443437
partition,
444438
tqConfig,
445439
logger,
@@ -456,10 +450,10 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager(
456450
metricsHandler,
457451
userDataManager,
458452
)
459-
close(done)
460453
if err != nil {
461454
return nil, false, err
462455
}
456+
userDataManager.SetOnChange(newPM.userDataChanged)
463457

464458
// If it gets here, write lock and check again in case a task queue is created between the two locks
465459
e.partitionsLock.Lock()

service/matching/physical_task_queue_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (s *PhysicalTaskQueueManagerTestSuite) SetupTest() {
8585
prtn := s.physicalTaskQueueKey.Partition()
8686
tqConfig := newTaskQueueConfig(prtn.TaskQueue(), engine.config, nsName)
8787
onFatalErr := func(unloadCause) { s.T().Fatal("user data manager called onFatalErr") }
88-
udMgr := newUserDataManager(engine.taskManager, engine.matchingRawClient, onFatalErr, nil, prtn, tqConfig, engine.logger, engine.namespaceRegistry)
88+
udMgr := newUserDataManager(engine.taskManager, engine.matchingRawClient, onFatalErr, prtn, tqConfig, engine.logger, engine.namespaceRegistry)
8989

9090
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
9191
prtnMgr, err := newTaskQueuePartitionManager(ctx, engine, ns, prtn, tqConfig, engine.logger, nil, metrics.NoopMetricsHandler, udMgr)

service/matching/task_queue_partition_manager_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,10 @@ type mockUserDataManager struct {
607607
data *persistencespb.VersionedTaskQueueUserData
608608
}
609609

610+
func (m *mockUserDataManager) SetOnChange(fn UserDataChangedFunc) {
611+
// noop
612+
}
613+
610614
func (m *mockUserDataManager) Start() {
611615
// noop
612616
}

service/matching/user_data_manager.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type (
5050
// Handles the maybe-long-poll GetUserData RPC.
5151
HandleGetUserDataRequest(ctx context.Context, req *matchingservice.GetTaskQueueUserDataRequest) (*matchingservice.GetTaskQueueUserDataResponse, error)
5252
CheckTaskQueueUserDataPropagation(context.Context, int64, int, int) error
53+
SetOnChange(fn UserDataChangedFunc)
5354
}
5455

5556
UserDataUpdateOptions struct {
@@ -65,6 +66,8 @@ type (
6566
// Extra care should be taken to avoid mutating the current user data to avoid keeping uncommitted data in memory.
6667
UserDataUpdateFunc func(*persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error)
6768

69+
UserDataChangedFunc func(from, to *persistencespb.VersionedTaskQueueUserData)
70+
6871
// userDataManager is responsible for fetching and keeping user data up-to-date in-memory
6972
// for a given TQ partition.
7073
//
@@ -75,7 +78,7 @@ type (
7578
userDataManagerImpl struct {
7679
lock sync.Mutex
7780
onFatalErr func(unloadCause)
78-
onUserDataChanged func(from, to *persistencespb.VersionedTaskQueueUserData) // if set, call this in new goroutine when user data changes
81+
onUserDataChanged UserDataChangedFunc // if set, call this in new goroutine when user data changes
7982
partition tqid.Partition
8083
userData *persistencespb.VersionedTaskQueueUserData
8184
userDataChanged chan struct{}
@@ -110,15 +113,13 @@ func newUserDataManager(
110113
store persistence.TaskManager,
111114
matchingClient matchingservice.MatchingServiceClient,
112115
onFatalErr func(unloadCause),
113-
onUserDataChanged func(from, to *persistencespb.VersionedTaskQueueUserData),
114116
partition tqid.Partition,
115117
config *taskQueueConfig,
116118
logger log.Logger,
117119
registry namespace.Registry,
118120
) *userDataManagerImpl {
119121
m := &userDataManagerImpl{
120122
onFatalErr: onFatalErr,
121-
onUserDataChanged: onUserDataChanged,
122123
partition: partition,
123124
userDataChanged: make(chan struct{}),
124125
config: config,
@@ -135,6 +136,12 @@ func newUserDataManager(
135136
return m
136137
}
137138

139+
func (m *userDataManagerImpl) SetOnChange(fn UserDataChangedFunc) {
140+
m.lock.Lock()
141+
m.onUserDataChanged = fn
142+
m.lock.Unlock()
143+
}
144+
138145
func (m *userDataManagerImpl) Start() {
139146
if m.store != nil {
140147
m.goroGroup.Go(m.loadUserData)

service/matching/user_data_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func createUserDataManager(
6666
onFatalErr = func(unloadCause) { t.Fatal("user data manager called onFatalErr") }
6767
}
6868

69-
return newUserDataManager(tm, testOpts.matchingClientMock, onFatalErr, nil, testOpts.dbq.Partition(), newTaskQueueConfig(testOpts.dbq.Partition().TaskQueue(), testOpts.config, ns), logger, mockNamespaceCache)
69+
return newUserDataManager(tm, testOpts.matchingClientMock, onFatalErr, testOpts.dbq.Partition(), newTaskQueueConfig(testOpts.dbq.Partition().TaskQueue(), testOpts.config, ns), logger, mockNamespaceCache)
7070
}
7171

7272
func TestUserData_LoadOnInit(t *testing.T) {

0 commit comments

Comments
 (0)