Skip to content
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 139 additions & 65 deletions api/persistence/v1/task_queues.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,11 @@ second per poller by one physical queue manager`,
The metric has 2 dimensions: namespace_id and plugin_name. Disabled by default as this is
an optional feature and also requires a metrics collection system that can handle higher cardinalities.`,
)
MatchingAutoEnable = NewTaskQueueBoolSetting(
"matching.autoEnable",
false,
`MatchingAutoEnable automatically enables fairness when a fairness key is seen`,
)

// keys for history

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ message TaskQueueTypeUserData {
DeploymentData deployment_data = 1;

temporal.api.taskqueue.v1.TaskQueueConfig config = 2;

enum FairnessState {
FAIRNESS_STATE_UNSPECIFIED = 0;
FAIRNESS_STATE_V1 = 1;
FAIRNESS_STATE_V2 = 2;
};
Comment on lines +138 to +142
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I think we usually put enums at the top-level. but I guess this is fine, we don't expect to use it anywhere else.

FairnessState fairness_state = 3;
}

// Container for all persistent user provided data for a task queue family.
Expand Down
7 changes: 7 additions & 0 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type (
MembershipUnloadDelay dynamicconfig.DurationPropertyFn
TaskQueueInfoByBuildIdTTL dynamicconfig.DurationPropertyFnWithTaskQueueFilter
PriorityLevels dynamicconfig.IntPropertyFnWithTaskQueueFilter
AutoEnableSub dynamicconfig.TypedSubscribableWithTaskQueueFilter[bool]

RateLimiterRefreshInterval time.Duration
FairnessKeyRateLimitCacheSize dynamicconfig.IntPropertyFnWithTaskQueueFilter
Expand Down Expand Up @@ -149,6 +150,8 @@ type (
EnableFairness bool
EnableFairnessSub func(func(bool)) (bool, func())
EnableMigration func() bool
AutoEnable bool
AutoEnableSub func(func(bool)) (bool, func())
GetTasksBatchSize func() int
GetTasksReloadAt func() int
UpdateAckInterval func() time.Duration
Expand Down Expand Up @@ -308,6 +311,7 @@ func NewConfig(
FairnessKeyRateLimitCacheSize: dynamicconfig.MatchingFairnessKeyRateLimitCacheSize.Get(dc),
MaxFairnessKeyWeightOverrides: dynamicconfig.MatchingMaxFairnessKeyWeightOverrides.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),
AutoEnableSub: dynamicconfig.MatchingAutoEnable.Subscribe(dc),

AdminNamespaceToPartitionDispatchRate: dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate.Get(dc),
AdminNamespaceToPartitionRateSub: dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate.Subscribe(dc),
Expand Down Expand Up @@ -355,6 +359,9 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
EnableMigration: func() bool {
return config.EnableMigration(ns.String(), taskQueueName, taskType)
},
AutoEnableSub: func(cb func(bool)) (bool, func()) {
return config.AutoEnableSub(ns.String(), taskQueueName, taskType, cb)
},
GetTasksBatchSize: func() int {
return config.GetTasksBatchSize(ns.String(), taskQueueName, taskType)
},
Expand Down
2 changes: 1 addition & 1 deletion service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager(
tqConfig.loadCause = loadCause
logger, throttledLogger, metricsHandler := e.loggerAndMetricsForPartition(namespaceEntry, partition, tqConfig)
onFatalErr := func(cause unloadCause) { newPM.unloadFromEngine(cause) }
onUserDataChanged := func() { newPM.userDataChanged() }
onUserDataChanged := func(to *persistencespb.VersionedTaskQueueUserData) { newPM.userDataChanged(to) }
userDataManager := newUserDataManager(
e.taskManager,
e.matchingRawClient,
Expand Down
15 changes: 8 additions & 7 deletions service/matching/matching_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2119,8 +2119,7 @@ func (s *matchingEngineSuite) TestTaskQueueManagerGetTaskBatch() {
s.NoError(err)
}

tlMgr, ok := s.matchingEngine.partitions[dbq.Partition().Key()].(*taskQueuePartitionManagerImpl).defaultQueue.(*physicalTaskQueueManagerImpl)
s.True(ok, "taskQueueManger doesn't implement taskQueuePartitionManager interface")
tlMgr := s.getPhysicalTaskQueueManagerImpl(dbq)
s.EqualValues(taskCount, s.taskManager.getTaskCount(dbq))

// wait until all tasks are read by the task pump and enqueued into the in-memory buffer
Expand Down Expand Up @@ -2254,8 +2253,7 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
s.NoError(err)
}

tlMgr, ok := s.matchingEngine.partitions[dbq.Partition().Key()].(*taskQueuePartitionManagerImpl).defaultQueue.(*physicalTaskQueueManagerImpl)
s.True(ok, "failed to load task queue")
tlMgr := s.getPhysicalTaskQueueManagerImpl(dbq)
s.EqualValues(taskCount, s.taskManager.getTaskCount(dbq))
blm := tlMgr.backlogMgr.(*backlogManagerImpl)

Expand Down Expand Up @@ -2988,7 +2986,9 @@ func (s *matchingEngineSuite) TestUpdatePhysicalTaskQueueGauge_UnVersioned() {
// the size of the map to 1 and it's counter to 1.
s.PhysicalQueueMetricValidator(capture, 1, 1)

tlmImpl, ok := tqm.(*taskQueuePartitionManagerImpl).defaultQueue.(*physicalTaskQueueManagerImpl)
defaultQ, err := tqm.(*taskQueuePartitionManagerImpl).managerReady.Get(context.Background())
s.Require().NoError(err)
tlmImpl := defaultQ.(*physicalTaskQueueManagerImpl)
s.True(ok)

s.matchingEngine.updatePhysicalTaskQueueGauge(s.ns, prtn, tlmImpl.queue.version, 1)
Expand Down Expand Up @@ -3223,8 +3223,9 @@ func (s *matchingEngineSuite) pollWorkflowTasksConcurrently(

// getPhysicalTaskQueueManagerImpl extracts the physicalTaskQueueManagerImpl for the given PhysicalTaskQueueKey
func (s *matchingEngineSuite) getPhysicalTaskQueueManagerImpl(ptq *PhysicalTaskQueueKey) *physicalTaskQueueManagerImpl {
pgMgr, ok := s.matchingEngine.partitions[ptq.Partition().Key()].(*taskQueuePartitionManagerImpl).defaultQueue.(*physicalTaskQueueManagerImpl)
s.True(ok, "taskQueueManger doesn't implement taskQueuePartitionManager interface")
defaultQ, err := s.matchingEngine.partitions[ptq.Partition().Key()].(*taskQueuePartitionManagerImpl).managerReady.Get(context.Background())
s.Require().NoError(err)
pgMgr := defaultQ.(*physicalTaskQueueManagerImpl)
return pgMgr
}

Expand Down
15 changes: 13 additions & 2 deletions service/matching/physical_task_queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,16 @@ func (s *PhysicalTaskQueueManagerTestSuite) SetupTest() {
s.NoError(err)
engine.partitions[prtn.Key()] = prtnMgr

if s.fairness {
prtnMgr.config.NewMatcher = true
prtnMgr.config.EnableFairness = true
} else if s.newMatcher {
prtnMgr.config.NewMatcher = true
}
Comment on lines +94 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, the stuff on line 70-74 doesn't take care of this? if we do need this, maybe we don't need that?

reading this whole function again... it duplicates a lot of real code to set things up. I wish we could consolidate. but probably not now.


s.tqMgr, err = newPhysicalTaskQueueManager(prtnMgr, s.physicalTaskQueueKey)
s.NoError(err)
prtnMgr.defaultQueue = s.tqMgr
prtnMgr.managerReady.Set(s.tqMgr, nil)
}

/*
Expand Down Expand Up @@ -334,13 +341,17 @@ func (s *PhysicalTaskQueueManagerTestSuite) TestAddTaskStandby() {
s.tqMgr.namespaceRegistry = mockNamespaceCache

s.tqMgr.Start()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
err := s.tqMgr.WaitUntilInitialized(ctx)
s.Require().NoError(err)
defer s.tqMgr.Stop(unloadCauseShuttingDown)
cancel()

// stop taskWriter so that we can check if there's any call to it
// otherwise the task persist process is async and hard to test
s.tqMgr.tqCtxCancel()

err := s.tqMgr.SpoolTask(&persistencespb.TaskInfo{
err = s.tqMgr.SpoolTask(&persistencespb.TaskInfo{
CreateTime: timestamp.TimePtr(time.Now().UTC()),
})
s.Equal(errShutdown, err) // task writer was stopped above
Expand Down
Loading
Loading