Skip to content

Commit be61432

Browse files
committed
add code for auto enabling priority and fairness
1 parent 5a3f773 commit be61432

File tree

10 files changed

+243
-85
lines changed

10 files changed

+243
-85
lines changed

api/persistence/v1/task_queues.pb.go

Lines changed: 139 additions & 65 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,6 +1382,11 @@ second per poller by one physical queue manager`,
13821382
The metric has 2 dimensions: namespace_id and plugin_name. Disabled by default as this is
13831383
an optional feature and also requires a metrics collection system that can handle higher cardinalities.`,
13841384
)
1385+
MatchingAutoEnable = NewTaskQueueBoolSetting(
1386+
"matching.autoEnable",
1387+
false,
1388+
`MatchingAutoEnable automatically enables fairness when a fairness key is seen`,
1389+
)
13851390

13861391
// keys for history
13871392

proto/internal/temporal/server/api/persistence/v1/task_queues.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,13 @@ message TaskQueueTypeUserData {
134134
DeploymentData deployment_data = 1;
135135

136136
temporal.api.taskqueue.v1.TaskQueueConfig config = 2;
137+
138+
enum FairnessState {
139+
FAIRNESS_STATE_UNSPECIFIED = 0;
140+
FAIRNESS_STATE_V1 = 1;
141+
FAIRNESS_STATE_V2 = 2;
142+
};
143+
FairnessState fairness_state = 3;
137144
}
138145

139146
// Container for all persistent user provided data for a task queue family.

service/matching/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type (
8282
MembershipUnloadDelay dynamicconfig.DurationPropertyFn
8383
TaskQueueInfoByBuildIdTTL dynamicconfig.DurationPropertyFnWithTaskQueueFilter
8484
PriorityLevels dynamicconfig.IntPropertyFnWithTaskQueueFilter
85+
AutoEnableSub dynamicconfig.TypedSubscribableWithTaskQueueFilter[bool]
8586

8687
RateLimiterRefreshInterval time.Duration
8788
FairnessKeyRateLimitCacheSize dynamicconfig.IntPropertyFnWithTaskQueueFilter
@@ -149,6 +150,8 @@ type (
149150
EnableFairness bool
150151
EnableFairnessSub func(func(bool)) (bool, func())
151152
EnableMigration func() bool
153+
AutoEnable bool
154+
AutoEnableSub func(func(bool)) (bool, func())
152155
GetTasksBatchSize func() int
153156
GetTasksReloadAt func() int
154157
UpdateAckInterval func() time.Duration
@@ -308,6 +311,7 @@ func NewConfig(
308311
FairnessKeyRateLimitCacheSize: dynamicconfig.MatchingFairnessKeyRateLimitCacheSize.Get(dc),
309312
MaxFairnessKeyWeightOverrides: dynamicconfig.MatchingMaxFairnessKeyWeightOverrides.Get(dc),
310313
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
314+
AutoEnableSub: dynamicconfig.MatchingAutoEnable.Subscribe(dc),
311315

312316
AdminNamespaceToPartitionDispatchRate: dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate.Get(dc),
313317
AdminNamespaceToPartitionRateSub: dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate.Subscribe(dc),
@@ -355,6 +359,9 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
355359
EnableMigration: func() bool {
356360
return config.EnableMigration(ns.String(), taskQueueName, taskType)
357361
},
362+
AutoEnableSub: func(cb func(bool)) (bool, func()) {
363+
return config.AutoEnableSub(ns.String(), taskQueueName, taskType, cb)
364+
},
358365
GetTasksBatchSize: func() int {
359366
return config.GetTasksBatchSize(ns.String(), taskQueueName, taskType)
360367
},

service/matching/matching_engine.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,11 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager(
430430
tqConfig.loadCause = loadCause
431431
logger, throttledLogger, metricsHandler := e.loggerAndMetricsForPartition(namespaceEntry, partition, tqConfig)
432432
onFatalErr := func(cause unloadCause) { newPM.unloadFromEngine(cause) }
433-
onUserDataChanged := func() { newPM.userDataChanged() }
433+
done := make(chan struct{})
434+
onUserDataChanged := func(old, new *persistencespb.VersionedTaskQueueUserData) {
435+
<-done
436+
newPM.userDataChanged(old, new)
437+
}
434438
userDataManager := newUserDataManager(
435439
e.taskManager,
436440
e.matchingRawClient,
@@ -442,6 +446,7 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager(
442446
e.namespaceRegistry,
443447
)
444448
newPM, err = newTaskQueuePartitionManager(
449+
ctx,
445450
e,
446451
namespaceEntry,
447452
partition,
@@ -451,6 +456,7 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager(
451456
metricsHandler,
452457
userDataManager,
453458
)
459+
close(done)
454460
if err != nil {
455461
return nil, false, err
456462
}

service/matching/matching_engine_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,10 @@ func newMatchingEngine(
267267
func (s *matchingEngineSuite) newPartitionManager(prtn tqid.Partition, config *Config) taskQueuePartitionManager {
268268
tqConfig := newTaskQueueConfig(prtn.TaskQueue(), config, matchingTestNamespace)
269269
logger, _, metricsHandler := s.matchingEngine.loggerAndMetricsForPartition(s.ns, prtn, tqConfig)
270-
pm, err := newTaskQueuePartitionManager(s.matchingEngine, s.ns, prtn, tqConfig, logger, logger, metricsHandler, &mockUserDataManager{})
270+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
271+
pm, err := newTaskQueuePartitionManager(ctx, s.matchingEngine, s.ns, prtn, tqConfig, logger, logger, metricsHandler, &mockUserDataManager{})
271272
s.Require().NoError(err)
273+
cancel()
272274
return pm
273275
}
274276

service/matching/physical_task_queue_manager_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,10 @@ func (s *PhysicalTaskQueueManagerTestSuite) SetupTest() {
8787
onFatalErr := func(unloadCause) { s.T().Fatal("user data manager called onFatalErr") }
8888
udMgr := newUserDataManager(engine.taskManager, engine.matchingRawClient, onFatalErr, nil, prtn, tqConfig, engine.logger, engine.namespaceRegistry)
8989

90-
prtnMgr, err := newTaskQueuePartitionManager(engine, ns, prtn, tqConfig, engine.logger, nil, metrics.NoopMetricsHandler, udMgr)
90+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
91+
prtnMgr, err := newTaskQueuePartitionManager(ctx, engine, ns, prtn, tqConfig, engine.logger, nil, metrics.NoopMetricsHandler, udMgr)
9192
s.NoError(err)
93+
cancel()
9294
engine.partitions[prtn.Key()] = prtnMgr
9395

9496
s.tqMgr, err = newPhysicalTaskQueueManager(prtnMgr, s.physicalTaskQueueKey)

service/matching/task_queue_partition_manager.go

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.temporal.io/server/api/matchingservice/v1"
1616
persistencespb "go.temporal.io/server/api/persistence/v1"
1717
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
18+
"go.temporal.io/server/common"
1819
"go.temporal.io/server/common/cache"
1920
"go.temporal.io/server/common/headers"
2021
"go.temporal.io/server/common/log"
@@ -63,8 +64,11 @@ type (
6364
// TODO(stephanos): move cache out of partition manager
6465
cache cache.Cache // non-nil for root-partition
6566

67+
fairnessState persistencespb.TaskQueueTypeUserData_FairnessState //Set once on initialization and read only after
68+
6669
cancelNewMatcherSub func()
6770
cancelFairnessSub func()
71+
cancelAutoEnableSub func()
6872

6973
// rateLimitManager is used to manage the rate limit for task queues.
7074
rateLimitManager *rateLimitManager
@@ -82,6 +86,7 @@ func (pm *taskQueuePartitionManagerImpl) GetCache(key any) any {
8286
var _ taskQueuePartitionManager = (*taskQueuePartitionManagerImpl)(nil)
8387

8488
func newTaskQueuePartitionManager(
89+
ctx context.Context,
8590
e *matchingEngineImpl,
8691
ns *namespace.Namespace,
8792
partition tqid.Partition,
@@ -119,14 +124,40 @@ func newTaskQueuePartitionManager(
119124
pm.unloadFromEngine(unloadCauseConfigChange)
120125
}
121126

122-
var fairness bool
123-
fairness, pm.cancelFairnessSub = tqConfig.EnableFairnessSub(unload)
124-
// Fairness is disabled for sticky queues for now so that we can still use TTLs.
125-
tqConfig.EnableFairness = fairness && partition.Kind() != enumspb.TASK_QUEUE_KIND_STICKY
126-
if fairness {
127+
pm.userDataManager.Start()
128+
err := pm.userDataManager.WaitUntilInitialized(ctx)
129+
if err != nil {
130+
return nil, err
131+
}
132+
//todo(moody): do we need to be more cautious loading this? Do we need to ensure that we have PerType()? Check for a nil return?
133+
data, _, err := pm.getPerTypeUserData()
134+
if err != nil {
135+
return nil, err
136+
}
137+
138+
tqConfig.AutoEnable, pm.cancelAutoEnableSub = tqConfig.AutoEnableSub(unload)
139+
pm.fairnessState = data.GetFairnessState()
140+
switch {
141+
case tqConfig.AutoEnable == false || pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED:
142+
var fairness bool
143+
fairness, pm.cancelFairnessSub = tqConfig.EnableFairnessSub(unload)
144+
// Fairness is disabled for sticky queues for now so that we can still use TTLs.
145+
tqConfig.EnableFairness = fairness && partition.Kind() != enumspb.TASK_QUEUE_KIND_STICKY
146+
if fairness {
147+
tqConfig.NewMatcher = true
148+
} else {
149+
tqConfig.NewMatcher, pm.cancelNewMatcherSub = tqConfig.NewMatcherSub(unload)
150+
}
151+
case pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V1:
127152
tqConfig.NewMatcher = true
128-
} else {
129-
tqConfig.NewMatcher, pm.cancelNewMatcherSub = tqConfig.NewMatcherSub(unload)
153+
tqConfig.EnableFairness = false
154+
case pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V2:
155+
tqConfig.NewMatcher = true
156+
if partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY {
157+
tqConfig.EnableFairness = false
158+
} else {
159+
tqConfig.EnableFairness = true
160+
}
130161
}
131162

132163
defaultQ, err := newPhysicalTaskQueueManager(pm, UnversionedQueueKey(partition))
@@ -139,7 +170,6 @@ func newTaskQueuePartitionManager(
139170

140171
func (pm *taskQueuePartitionManagerImpl) Start() {
141172
pm.engine.updateTaskQueuePartitionGauge(pm.Namespace(), pm.partition, 1)
142-
pm.userDataManager.Start()
143173
pm.defaultQueue.Start()
144174
}
145175

@@ -159,6 +189,9 @@ func (pm *taskQueuePartitionManagerImpl) Stop(unloadCause unloadCause) {
159189
if pm.cancelNewMatcherSub != nil {
160190
pm.cancelNewMatcherSub()
161191
}
192+
if pm.cancelAutoEnableSub != nil {
193+
pm.cancelAutoEnableSub()
194+
}
162195

163196
// First, stop all queues to wrap up ongoing operations.
164197
for _, vq := range pm.versionedQueues {
@@ -184,10 +217,6 @@ func (pm *taskQueuePartitionManagerImpl) MarkAlive() {
184217
}
185218

186219
func (pm *taskQueuePartitionManagerImpl) WaitUntilInitialized(ctx context.Context) error {
187-
err := pm.userDataManager.WaitUntilInitialized(ctx)
188-
if err != nil {
189-
return err
190-
}
191220
return pm.defaultQueue.WaitUntilInitialized(ctx)
192221
}
193222

@@ -197,6 +226,21 @@ func (pm *taskQueuePartitionManagerImpl) AddTask(
197226
) (buildId string, syncMatched bool, err error) {
198227
var spoolQueue, syncMatchQueue physicalTaskQueueManager
199228
directive := params.taskInfo.GetVersionDirective()
229+
230+
if pm.Partition().IsRoot() && pm.config.AutoEnable && pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED {
231+
//TODO(moody): unsure about this check, what is the correct way to check to see if PriorityKey is set?
232+
//TODO(moody): This was originally discussed to be a separate API, but is just exposing this through the generic UpdateUserData sufficient?
233+
// what is the pro/cons of adding the new API and perhaps invoking that instead? We're going to unload either way...
234+
if params.taskInfo.Priority != nil && (params.taskInfo.Priority.FairnessKey != "" || params.taskInfo.Priority.PriorityKey != int32(pm.config.DefaultPriorityKey)) {
235+
updateFn := func(old *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) {
236+
new := common.CloneProto(old)
237+
perType := new.GetPerType()[int32(pm.Partition().TaskType())]
238+
perType.FairnessState = persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V2
239+
return new, true, nil
240+
}
241+
pm.userDataManager.UpdateUserData(ctx, UserDataUpdateOptions{Source: "Matching auto enable"}, updateFn)
242+
}
243+
}
200244
// spoolQueue will be nil iff task is forwarded.
201245
reredirectTask:
202246
spoolQueue, syncMatchQueue, _, taskDispatchRevisionNumber, err := pm.getPhysicalQueuesForAdd(ctx, directive, params.forwardInfo, params.taskInfo.GetRunId(), params.taskInfo.GetWorkflowId(), false)
@@ -1303,7 +1347,17 @@ func (pm *taskQueuePartitionManagerImpl) getPerTypeUserData() (*persistencespb.T
13031347
return perType, userDataChanged, nil
13041348
}
13051349

1306-
func (pm *taskQueuePartitionManagerImpl) userDataChanged() {
1350+
func (pm *taskQueuePartitionManagerImpl) userDataChanged(old, new *persistencespb.VersionedTaskQueueUserData) {
1351+
taskType := int32(pm.Partition().TaskType())
1352+
//TODO(moody): this stinks, do we need this more verbose, is this interface bad?
1353+
//TODO(moody): we get calls into this callback quite a bit with data being nil(sampled from unit tests), do we want to go through the full update
1354+
// even in those cases? Should we pass the inner data and avoid a callback when that is nil? I am not totally sure about the implcations....
1355+
if old != nil && old.GetData() != nil && old.GetData().GetPerType() != nil && new != nil && new.GetData() != nil && new.GetData().GetPerType() != nil {
1356+
if old.GetData().GetPerType()[taskType].FairnessState != new.GetData().GetPerType()[taskType].FairnessState {
1357+
pm.unloadFromEngine(unloadCauseConfigChange)
1358+
return
1359+
}
1360+
}
13071361
// Update rateLimits if any change is userData.
13081362
pm.rateLimitManager.UserDataChanged()
13091363

service/matching/task_queue_partition_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (s *PartitionManagerTestSuite) SetupTest() {
8585
tqConfig := newTaskQueueConfig(partition.TaskQueue(), engine.config, ns.Name())
8686
s.userDataMgr = &mockUserDataManager{}
8787

88-
pm, err := newTaskQueuePartitionManager(engine, ns, partition, tqConfig, logger, logger, metrics.NoopMetricsHandler, s.userDataMgr)
88+
pm, err := newTaskQueuePartitionManager(context.Background(), engine, ns, partition, tqConfig, logger, logger, metrics.NoopMetricsHandler, s.userDataMgr)
8989
s.NoError(err)
9090
s.partitionMgr = pm
9191
engine.Start()

service/matching/user_data_manager.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type (
7575
userDataManagerImpl struct {
7676
lock sync.Mutex
7777
onFatalErr func(unloadCause)
78-
onUserDataChanged func() // if set, call this in new goroutine when user data changes
78+
onUserDataChanged func(old, new *persistencespb.VersionedTaskQueueUserData) // if set, call this in new goroutine when user data changes
7979
partition tqid.Partition
8080
userData *persistencespb.VersionedTaskQueueUserData
8181
userDataChanged chan struct{}
@@ -110,7 +110,7 @@ func newUserDataManager(
110110
store persistence.TaskManager,
111111
matchingClient matchingservice.MatchingServiceClient,
112112
onFatalErr func(unloadCause),
113-
onUserDataChanged func(),
113+
onUserDataChanged func(old, new *persistencespb.VersionedTaskQueueUserData),
114114
partition tqid.Partition,
115115
config *taskQueueConfig,
116116
logger log.Logger,
@@ -176,11 +176,12 @@ func (m *userDataManagerImpl) getUserDataLocked() (*persistencespb.VersionedTask
176176
}
177177

178178
func (m *userDataManagerImpl) setUserDataLocked(userData *persistencespb.VersionedTaskQueueUserData) {
179+
old := m.userData
179180
m.userData = userData
180181
close(m.userDataChanged)
181182
m.userDataChanged = make(chan struct{})
182183
if m.onUserDataChanged != nil {
183-
go m.onUserDataChanged()
184+
go m.onUserDataChanged(old, userData)
184185
}
185186
}
186187

0 commit comments

Comments
 (0)