diff --git a/api/persistence/v1/task_queues.pb.go b/api/persistence/v1/task_queues.pb.go index 81d9db8a77c..ae769863a03 100644 --- a/api/persistence/v1/task_queues.pb.go +++ b/api/persistence/v1/task_queues.pb.go @@ -86,6 +86,65 @@ func (BuildId_State) EnumDescriptor() ([]byte, []int) { return file_temporal_server_api_persistence_v1_task_queues_proto_rawDescGZIP(), []int{0, 0} } +type TaskQueueTypeUserData_FairnessState int32 + +const ( + TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED TaskQueueTypeUserData_FairnessState = 0 + TaskQueueTypeUserData_FAIRNESS_STATE_V1 TaskQueueTypeUserData_FairnessState = 1 + TaskQueueTypeUserData_FAIRNESS_STATE_V2 TaskQueueTypeUserData_FairnessState = 2 +) + +// Enum value maps for TaskQueueTypeUserData_FairnessState. +var ( + TaskQueueTypeUserData_FairnessState_name = map[int32]string{ + 0: "FAIRNESS_STATE_UNSPECIFIED", + 1: "FAIRNESS_STATE_V1", + 2: "FAIRNESS_STATE_V2", + } + TaskQueueTypeUserData_FairnessState_value = map[string]int32{ + "FAIRNESS_STATE_UNSPECIFIED": 0, + "FAIRNESS_STATE_V1": 1, + "FAIRNESS_STATE_V2": 2, + } +) + +func (x TaskQueueTypeUserData_FairnessState) Enum() *TaskQueueTypeUserData_FairnessState { + p := new(TaskQueueTypeUserData_FairnessState) + *p = x + return p +} + +func (x TaskQueueTypeUserData_FairnessState) String() string { + switch x { + case TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED: + return "TaskQueueTypeUserDataFairnessStateUnspecified" + case TaskQueueTypeUserData_FAIRNESS_STATE_V1: + return "TaskQueueTypeUserDataFairnessStateV1" + case TaskQueueTypeUserData_FAIRNESS_STATE_V2: + return "TaskQueueTypeUserDataFairnessStateV2" + default: + return strconv.Itoa(int(x)) + } + +} + +func (TaskQueueTypeUserData_FairnessState) Descriptor() protoreflect.EnumDescriptor { + return file_temporal_server_api_persistence_v1_task_queues_proto_enumTypes[1].Descriptor() +} + +func (TaskQueueTypeUserData_FairnessState) Type() protoreflect.EnumType { + return &file_temporal_server_api_persistence_v1_task_queues_proto_enumTypes[1] +} + +func (x TaskQueueTypeUserData_FairnessState) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use TaskQueueTypeUserData_FairnessState.Descriptor instead. +func (TaskQueueTypeUserData_FairnessState) EnumDescriptor() ([]byte, []int) { + return file_temporal_server_api_persistence_v1_task_queues_proto_rawDescGZIP(), []int{7, 0} +} + // BuildId is an identifier with a timestamped status used to identify workers for task queue versioning purposes. type BuildId struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -597,9 +656,10 @@ func (x *WorkerDeploymentData) GetVersions() map[string]*v12.WorkerDeploymentVer // Container for all persistent user data that varies per task queue type within a family. type TaskQueueTypeUserData struct { - state protoimpl.MessageState `protogen:"open.v1"` - DeploymentData *DeploymentData `protobuf:"bytes,1,opt,name=deployment_data,json=deploymentData,proto3" json:"deployment_data,omitempty"` - Config *v11.TaskQueueConfig `protobuf:"bytes,2,opt,name=config,proto3" json:"config,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + DeploymentData *DeploymentData `protobuf:"bytes,1,opt,name=deployment_data,json=deploymentData,proto3" json:"deployment_data,omitempty"` + Config *v11.TaskQueueConfig `protobuf:"bytes,2,opt,name=config,proto3" json:"config,omitempty"` + FairnessState TaskQueueTypeUserData_FairnessState `protobuf:"varint,3,opt,name=fairness_state,json=fairnessState,proto3,enum=temporal.server.api.persistence.v1.TaskQueueTypeUserData_FairnessState" json:"fairness_state,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -648,6 +708,13 @@ func (x *TaskQueueTypeUserData) GetConfig() *v11.TaskQueueConfig { return nil } +func (x *TaskQueueTypeUserData) GetFairnessState() TaskQueueTypeUserData_FairnessState { + if x != nil { + return x.FairnessState + } + return TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED +} + // Container for all persistent user provided data for a task queue family. // "Task queue" as a named concept here is a task queue family, i.e. the set of task queues // that share a name, at most one of each type (workflow, activity, etc.). @@ -872,10 +939,15 @@ const file_temporal_server_api_persistence_v1_task_queues_proto_rawDesc = "" + "\bversions\x18\x02 \x03(\v2F.temporal.server.api.persistence.v1.WorkerDeploymentData.VersionsEntryR\bversions\x1a{\n" + "\rVersionsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12T\n" + - "\x05value\x18\x02 \x01(\v2>.temporal.server.api.deployment.v1.WorkerDeploymentVersionDataR\x05value:\x028\x01\"\xb8\x01\n" + + "\x05value\x18\x02 \x01(\v2>.temporal.server.api.deployment.v1.WorkerDeploymentVersionDataR\x05value:\x028\x01\"\x87\x03\n" + "\x15TaskQueueTypeUserData\x12[\n" + "\x0fdeployment_data\x18\x01 \x01(\v22.temporal.server.api.persistence.v1.DeploymentDataR\x0edeploymentData\x12B\n" + - "\x06config\x18\x02 \x01(\v2*.temporal.api.taskqueue.v1.TaskQueueConfigR\x06config\"\x8e\x03\n" + + "\x06config\x18\x02 \x01(\v2*.temporal.api.taskqueue.v1.TaskQueueConfigR\x06config\x12n\n" + + "\x0efairness_state\x18\x03 \x01(\x0e2G.temporal.server.api.persistence.v1.TaskQueueTypeUserData.FairnessStateR\rfairnessState\"]\n" + + "\rFairnessState\x12\x1e\n" + + "\x1aFAIRNESS_STATE_UNSPECIFIED\x10\x00\x12\x15\n" + + "\x11FAIRNESS_STATE_V1\x10\x01\x12\x15\n" + + "\x11FAIRNESS_STATE_V2\x10\x02\"\x8e\x03\n" + "\x11TaskQueueUserData\x12F\n" + "\x05clock\x18\x01 \x01(\v20.temporal.server.api.clock.v1.HybridLogicalClockR\x05clock\x12[\n" + "\x0fversioning_data\x18\x02 \x01(\v22.temporal.server.api.persistence.v1.VersioningDataR\x0eversioningData\x12]\n" + @@ -899,71 +971,73 @@ func file_temporal_server_api_persistence_v1_task_queues_proto_rawDescGZIP() []b return file_temporal_server_api_persistence_v1_task_queues_proto_rawDescData } -var file_temporal_server_api_persistence_v1_task_queues_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_temporal_server_api_persistence_v1_task_queues_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_temporal_server_api_persistence_v1_task_queues_proto_msgTypes = make([]protoimpl.MessageInfo, 14) var file_temporal_server_api_persistence_v1_task_queues_proto_goTypes = []any{ (BuildId_State)(0), // 0: temporal.server.api.persistence.v1.BuildId.State - (*BuildId)(nil), // 1: temporal.server.api.persistence.v1.BuildId - (*CompatibleVersionSet)(nil), // 2: temporal.server.api.persistence.v1.CompatibleVersionSet - (*AssignmentRule)(nil), // 3: temporal.server.api.persistence.v1.AssignmentRule - (*RedirectRule)(nil), // 4: temporal.server.api.persistence.v1.RedirectRule - (*VersioningData)(nil), // 5: temporal.server.api.persistence.v1.VersioningData - (*DeploymentData)(nil), // 6: temporal.server.api.persistence.v1.DeploymentData - (*WorkerDeploymentData)(nil), // 7: temporal.server.api.persistence.v1.WorkerDeploymentData - (*TaskQueueTypeUserData)(nil), // 8: temporal.server.api.persistence.v1.TaskQueueTypeUserData - (*TaskQueueUserData)(nil), // 9: temporal.server.api.persistence.v1.TaskQueueUserData - (*VersionedTaskQueueUserData)(nil), // 10: temporal.server.api.persistence.v1.VersionedTaskQueueUserData - nil, // 11: temporal.server.api.persistence.v1.DeploymentData.DeploymentsDataEntry - (*DeploymentData_DeploymentDataItem)(nil), // 12: temporal.server.api.persistence.v1.DeploymentData.DeploymentDataItem - nil, // 13: temporal.server.api.persistence.v1.WorkerDeploymentData.VersionsEntry - nil, // 14: temporal.server.api.persistence.v1.TaskQueueUserData.PerTypeEntry - (*v1.HybridLogicalClock)(nil), // 15: temporal.server.api.clock.v1.HybridLogicalClock - (*v11.BuildIdAssignmentRule)(nil), // 16: temporal.api.taskqueue.v1.BuildIdAssignmentRule - (*v11.CompatibleBuildIdRedirectRule)(nil), // 17: temporal.api.taskqueue.v1.CompatibleBuildIdRedirectRule - (*v12.DeploymentVersionData)(nil), // 18: temporal.server.api.deployment.v1.DeploymentVersionData - (*v13.RoutingConfig)(nil), // 19: temporal.api.deployment.v1.RoutingConfig - (*v11.TaskQueueConfig)(nil), // 20: temporal.api.taskqueue.v1.TaskQueueConfig - (*v13.Deployment)(nil), // 21: temporal.api.deployment.v1.Deployment - (*v12.TaskQueueData)(nil), // 22: temporal.server.api.deployment.v1.TaskQueueData - (*v12.WorkerDeploymentVersionData)(nil), // 23: temporal.server.api.deployment.v1.WorkerDeploymentVersionData + (TaskQueueTypeUserData_FairnessState)(0), // 1: temporal.server.api.persistence.v1.TaskQueueTypeUserData.FairnessState + (*BuildId)(nil), // 2: temporal.server.api.persistence.v1.BuildId + (*CompatibleVersionSet)(nil), // 3: temporal.server.api.persistence.v1.CompatibleVersionSet + (*AssignmentRule)(nil), // 4: temporal.server.api.persistence.v1.AssignmentRule + (*RedirectRule)(nil), // 5: temporal.server.api.persistence.v1.RedirectRule + (*VersioningData)(nil), // 6: temporal.server.api.persistence.v1.VersioningData + (*DeploymentData)(nil), // 7: temporal.server.api.persistence.v1.DeploymentData + (*WorkerDeploymentData)(nil), // 8: temporal.server.api.persistence.v1.WorkerDeploymentData + (*TaskQueueTypeUserData)(nil), // 9: temporal.server.api.persistence.v1.TaskQueueTypeUserData + (*TaskQueueUserData)(nil), // 10: temporal.server.api.persistence.v1.TaskQueueUserData + (*VersionedTaskQueueUserData)(nil), // 11: temporal.server.api.persistence.v1.VersionedTaskQueueUserData + nil, // 12: temporal.server.api.persistence.v1.DeploymentData.DeploymentsDataEntry + (*DeploymentData_DeploymentDataItem)(nil), // 13: temporal.server.api.persistence.v1.DeploymentData.DeploymentDataItem + nil, // 14: temporal.server.api.persistence.v1.WorkerDeploymentData.VersionsEntry + nil, // 15: temporal.server.api.persistence.v1.TaskQueueUserData.PerTypeEntry + (*v1.HybridLogicalClock)(nil), // 16: temporal.server.api.clock.v1.HybridLogicalClock + (*v11.BuildIdAssignmentRule)(nil), // 17: temporal.api.taskqueue.v1.BuildIdAssignmentRule + (*v11.CompatibleBuildIdRedirectRule)(nil), // 18: temporal.api.taskqueue.v1.CompatibleBuildIdRedirectRule + (*v12.DeploymentVersionData)(nil), // 19: temporal.server.api.deployment.v1.DeploymentVersionData + (*v13.RoutingConfig)(nil), // 20: temporal.api.deployment.v1.RoutingConfig + (*v11.TaskQueueConfig)(nil), // 21: temporal.api.taskqueue.v1.TaskQueueConfig + (*v13.Deployment)(nil), // 22: temporal.api.deployment.v1.Deployment + (*v12.TaskQueueData)(nil), // 23: temporal.server.api.deployment.v1.TaskQueueData + (*v12.WorkerDeploymentVersionData)(nil), // 24: temporal.server.api.deployment.v1.WorkerDeploymentVersionData } var file_temporal_server_api_persistence_v1_task_queues_proto_depIdxs = []int32{ 0, // 0: temporal.server.api.persistence.v1.BuildId.state:type_name -> temporal.server.api.persistence.v1.BuildId.State - 15, // 1: temporal.server.api.persistence.v1.BuildId.state_update_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock - 15, // 2: temporal.server.api.persistence.v1.BuildId.became_default_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock - 1, // 3: temporal.server.api.persistence.v1.CompatibleVersionSet.build_ids:type_name -> temporal.server.api.persistence.v1.BuildId - 15, // 4: temporal.server.api.persistence.v1.CompatibleVersionSet.became_default_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock - 16, // 5: temporal.server.api.persistence.v1.AssignmentRule.rule:type_name -> temporal.api.taskqueue.v1.BuildIdAssignmentRule - 15, // 6: temporal.server.api.persistence.v1.AssignmentRule.create_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock - 15, // 7: temporal.server.api.persistence.v1.AssignmentRule.delete_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock - 17, // 8: temporal.server.api.persistence.v1.RedirectRule.rule:type_name -> temporal.api.taskqueue.v1.CompatibleBuildIdRedirectRule - 15, // 9: temporal.server.api.persistence.v1.RedirectRule.create_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock - 15, // 10: temporal.server.api.persistence.v1.RedirectRule.delete_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock - 2, // 11: temporal.server.api.persistence.v1.VersioningData.version_sets:type_name -> temporal.server.api.persistence.v1.CompatibleVersionSet - 3, // 12: temporal.server.api.persistence.v1.VersioningData.assignment_rules:type_name -> temporal.server.api.persistence.v1.AssignmentRule - 4, // 13: temporal.server.api.persistence.v1.VersioningData.redirect_rules:type_name -> temporal.server.api.persistence.v1.RedirectRule - 12, // 14: temporal.server.api.persistence.v1.DeploymentData.deployments:type_name -> temporal.server.api.persistence.v1.DeploymentData.DeploymentDataItem - 18, // 15: temporal.server.api.persistence.v1.DeploymentData.versions:type_name -> temporal.server.api.deployment.v1.DeploymentVersionData - 18, // 16: temporal.server.api.persistence.v1.DeploymentData.unversioned_ramp_data:type_name -> temporal.server.api.deployment.v1.DeploymentVersionData - 11, // 17: temporal.server.api.persistence.v1.DeploymentData.deployments_data:type_name -> temporal.server.api.persistence.v1.DeploymentData.DeploymentsDataEntry - 19, // 18: temporal.server.api.persistence.v1.WorkerDeploymentData.routing_config:type_name -> temporal.api.deployment.v1.RoutingConfig - 13, // 19: temporal.server.api.persistence.v1.WorkerDeploymentData.versions:type_name -> temporal.server.api.persistence.v1.WorkerDeploymentData.VersionsEntry - 6, // 20: temporal.server.api.persistence.v1.TaskQueueTypeUserData.deployment_data:type_name -> temporal.server.api.persistence.v1.DeploymentData - 20, // 21: temporal.server.api.persistence.v1.TaskQueueTypeUserData.config:type_name -> temporal.api.taskqueue.v1.TaskQueueConfig - 15, // 22: temporal.server.api.persistence.v1.TaskQueueUserData.clock:type_name -> temporal.server.api.clock.v1.HybridLogicalClock - 5, // 23: temporal.server.api.persistence.v1.TaskQueueUserData.versioning_data:type_name -> temporal.server.api.persistence.v1.VersioningData - 14, // 24: temporal.server.api.persistence.v1.TaskQueueUserData.per_type:type_name -> temporal.server.api.persistence.v1.TaskQueueUserData.PerTypeEntry - 9, // 25: temporal.server.api.persistence.v1.VersionedTaskQueueUserData.data:type_name -> temporal.server.api.persistence.v1.TaskQueueUserData - 7, // 26: temporal.server.api.persistence.v1.DeploymentData.DeploymentsDataEntry.value:type_name -> temporal.server.api.persistence.v1.WorkerDeploymentData - 21, // 27: temporal.server.api.persistence.v1.DeploymentData.DeploymentDataItem.deployment:type_name -> temporal.api.deployment.v1.Deployment - 22, // 28: temporal.server.api.persistence.v1.DeploymentData.DeploymentDataItem.data:type_name -> temporal.server.api.deployment.v1.TaskQueueData - 23, // 29: temporal.server.api.persistence.v1.WorkerDeploymentData.VersionsEntry.value:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersionData - 8, // 30: temporal.server.api.persistence.v1.TaskQueueUserData.PerTypeEntry.value:type_name -> temporal.server.api.persistence.v1.TaskQueueTypeUserData - 31, // [31:31] is the sub-list for method output_type - 31, // [31:31] is the sub-list for method input_type - 31, // [31:31] is the sub-list for extension type_name - 31, // [31:31] is the sub-list for extension extendee - 0, // [0:31] is the sub-list for field type_name + 16, // 1: temporal.server.api.persistence.v1.BuildId.state_update_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock + 16, // 2: temporal.server.api.persistence.v1.BuildId.became_default_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock + 2, // 3: temporal.server.api.persistence.v1.CompatibleVersionSet.build_ids:type_name -> temporal.server.api.persistence.v1.BuildId + 16, // 4: temporal.server.api.persistence.v1.CompatibleVersionSet.became_default_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock + 17, // 5: temporal.server.api.persistence.v1.AssignmentRule.rule:type_name -> temporal.api.taskqueue.v1.BuildIdAssignmentRule + 16, // 6: temporal.server.api.persistence.v1.AssignmentRule.create_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock + 16, // 7: temporal.server.api.persistence.v1.AssignmentRule.delete_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock + 18, // 8: temporal.server.api.persistence.v1.RedirectRule.rule:type_name -> temporal.api.taskqueue.v1.CompatibleBuildIdRedirectRule + 16, // 9: temporal.server.api.persistence.v1.RedirectRule.create_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock + 16, // 10: temporal.server.api.persistence.v1.RedirectRule.delete_timestamp:type_name -> temporal.server.api.clock.v1.HybridLogicalClock + 3, // 11: temporal.server.api.persistence.v1.VersioningData.version_sets:type_name -> temporal.server.api.persistence.v1.CompatibleVersionSet + 4, // 12: temporal.server.api.persistence.v1.VersioningData.assignment_rules:type_name -> temporal.server.api.persistence.v1.AssignmentRule + 5, // 13: temporal.server.api.persistence.v1.VersioningData.redirect_rules:type_name -> temporal.server.api.persistence.v1.RedirectRule + 13, // 14: temporal.server.api.persistence.v1.DeploymentData.deployments:type_name -> temporal.server.api.persistence.v1.DeploymentData.DeploymentDataItem + 19, // 15: temporal.server.api.persistence.v1.DeploymentData.versions:type_name -> temporal.server.api.deployment.v1.DeploymentVersionData + 19, // 16: temporal.server.api.persistence.v1.DeploymentData.unversioned_ramp_data:type_name -> temporal.server.api.deployment.v1.DeploymentVersionData + 12, // 17: temporal.server.api.persistence.v1.DeploymentData.deployments_data:type_name -> temporal.server.api.persistence.v1.DeploymentData.DeploymentsDataEntry + 20, // 18: temporal.server.api.persistence.v1.WorkerDeploymentData.routing_config:type_name -> temporal.api.deployment.v1.RoutingConfig + 14, // 19: temporal.server.api.persistence.v1.WorkerDeploymentData.versions:type_name -> temporal.server.api.persistence.v1.WorkerDeploymentData.VersionsEntry + 7, // 20: temporal.server.api.persistence.v1.TaskQueueTypeUserData.deployment_data:type_name -> temporal.server.api.persistence.v1.DeploymentData + 21, // 21: temporal.server.api.persistence.v1.TaskQueueTypeUserData.config:type_name -> temporal.api.taskqueue.v1.TaskQueueConfig + 1, // 22: temporal.server.api.persistence.v1.TaskQueueTypeUserData.fairness_state:type_name -> temporal.server.api.persistence.v1.TaskQueueTypeUserData.FairnessState + 16, // 23: temporal.server.api.persistence.v1.TaskQueueUserData.clock:type_name -> temporal.server.api.clock.v1.HybridLogicalClock + 6, // 24: temporal.server.api.persistence.v1.TaskQueueUserData.versioning_data:type_name -> temporal.server.api.persistence.v1.VersioningData + 15, // 25: temporal.server.api.persistence.v1.TaskQueueUserData.per_type:type_name -> temporal.server.api.persistence.v1.TaskQueueUserData.PerTypeEntry + 10, // 26: temporal.server.api.persistence.v1.VersionedTaskQueueUserData.data:type_name -> temporal.server.api.persistence.v1.TaskQueueUserData + 8, // 27: temporal.server.api.persistence.v1.DeploymentData.DeploymentsDataEntry.value:type_name -> temporal.server.api.persistence.v1.WorkerDeploymentData + 22, // 28: temporal.server.api.persistence.v1.DeploymentData.DeploymentDataItem.deployment:type_name -> temporal.api.deployment.v1.Deployment + 23, // 29: temporal.server.api.persistence.v1.DeploymentData.DeploymentDataItem.data:type_name -> temporal.server.api.deployment.v1.TaskQueueData + 24, // 30: temporal.server.api.persistence.v1.WorkerDeploymentData.VersionsEntry.value:type_name -> temporal.server.api.deployment.v1.WorkerDeploymentVersionData + 9, // 31: temporal.server.api.persistence.v1.TaskQueueUserData.PerTypeEntry.value:type_name -> temporal.server.api.persistence.v1.TaskQueueTypeUserData + 32, // [32:32] is the sub-list for method output_type + 32, // [32:32] is the sub-list for method input_type + 32, // [32:32] is the sub-list for extension type_name + 32, // [32:32] is the sub-list for extension extendee + 0, // [0:32] is the sub-list for field type_name } func init() { file_temporal_server_api_persistence_v1_task_queues_proto_init() } @@ -976,7 +1050,7 @@ func file_temporal_server_api_persistence_v1_task_queues_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_temporal_server_api_persistence_v1_task_queues_proto_rawDesc), len(file_temporal_server_api_persistence_v1_task_queues_proto_rawDesc)), - NumEnums: 1, + NumEnums: 2, NumMessages: 14, NumExtensions: 0, NumServices: 0, diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index f71bcbbaf91..96e0e1aa303 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1382,6 +1382,11 @@ second per poller by one physical queue manager`, The metric has 2 dimensions: namespace_id and plugin_name. Disabled by default as this is an optional feature and also requires a metrics collection system that can handle higher cardinalities.`, ) + MatchingAutoEnableV2 = NewTaskQueueBoolSetting( + "matching.autoEnableV2", + false, + `MatchingAutoEnableV2 automatically enables fairness when a fairness or priority key is seen`, + ) // keys for history diff --git a/proto/internal/temporal/server/api/persistence/v1/task_queues.proto b/proto/internal/temporal/server/api/persistence/v1/task_queues.proto index 10d9ed021f6..ce05c16975c 100644 --- a/proto/internal/temporal/server/api/persistence/v1/task_queues.proto +++ b/proto/internal/temporal/server/api/persistence/v1/task_queues.proto @@ -134,6 +134,13 @@ message TaskQueueTypeUserData { DeploymentData deployment_data = 1; temporal.api.taskqueue.v1.TaskQueueConfig config = 2; + + enum FairnessState { + FAIRNESS_STATE_UNSPECIFIED = 0; + FAIRNESS_STATE_V1 = 1; + FAIRNESS_STATE_V2 = 2; + }; + FairnessState fairness_state = 3; } // Container for all persistent user provided data for a task queue family. diff --git a/service/matching/config.go b/service/matching/config.go index 425b2422a71..2dba9624500 100644 --- a/service/matching/config.go +++ b/service/matching/config.go @@ -82,6 +82,7 @@ type ( MembershipUnloadDelay dynamicconfig.DurationPropertyFn TaskQueueInfoByBuildIdTTL dynamicconfig.DurationPropertyFnWithTaskQueueFilter PriorityLevels dynamicconfig.IntPropertyFnWithTaskQueueFilter + AutoEnable dynamicconfig.BoolPropertyFnWithTaskQueueFilter RateLimiterRefreshInterval time.Duration FairnessKeyRateLimitCacheSize dynamicconfig.IntPropertyFnWithTaskQueueFilter @@ -149,6 +150,7 @@ type ( EnableFairness bool EnableFairnessSub func(func(bool)) (bool, func()) EnableMigration func() bool + AutoEnable func() bool GetTasksBatchSize func() int GetTasksReloadAt func() int UpdateAckInterval func() time.Duration @@ -308,6 +310,7 @@ func NewConfig( FairnessKeyRateLimitCacheSize: dynamicconfig.MatchingFairnessKeyRateLimitCacheSize.Get(dc), MaxFairnessKeyWeightOverrides: dynamicconfig.MatchingMaxFairnessKeyWeightOverrides.Get(dc), MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc), + AutoEnable: dynamicconfig.MatchingAutoEnableV2.Get(dc), AdminNamespaceToPartitionDispatchRate: dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate.Get(dc), AdminNamespaceToPartitionRateSub: dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate.Subscribe(dc), @@ -355,6 +358,9 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) * EnableMigration: func() bool { return config.EnableMigration(ns.String(), taskQueueName, taskType) }, + AutoEnable: func() bool { + return config.AutoEnable(ns.String(), taskQueueName, taskType) + }, GetTasksBatchSize: func() int { return config.GetTasksBatchSize(ns.String(), taskQueueName, taskType) }, diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index c85dabf4eca..aaf1ca1e048 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -430,7 +430,7 @@ func (e *matchingEngineImpl) getTaskQueuePartitionManager( tqConfig.loadCause = loadCause logger, throttledLogger, metricsHandler := e.loggerAndMetricsForPartition(namespaceEntry, partition, tqConfig) onFatalErr := func(cause unloadCause) { newPM.unloadFromEngine(cause) } - onUserDataChanged := func() { newPM.userDataChanged() } + onUserDataChanged := func(to *persistencespb.VersionedTaskQueueUserData) { newPM.userDataChanged(to) } userDataManager := newUserDataManager( e.taskManager, e.matchingRawClient, diff --git a/service/matching/matching_engine_test.go b/service/matching/matching_engine_test.go index d7880624475..720738f7dbb 100644 --- a/service/matching/matching_engine_test.go +++ b/service/matching/matching_engine_test.go @@ -2119,8 +2119,7 @@ func (s *matchingEngineSuite) TestTaskQueueManagerGetTaskBatch() { s.NoError(err) } - tlMgr, ok := s.matchingEngine.partitions[dbq.Partition().Key()].(*taskQueuePartitionManagerImpl).defaultQueue.(*physicalTaskQueueManagerImpl) - s.True(ok, "taskQueueManger doesn't implement taskQueuePartitionManager interface") + tlMgr := s.getPhysicalTaskQueueManagerImpl(dbq) s.EqualValues(taskCount, s.taskManager.getTaskCount(dbq)) // wait until all tasks are read by the task pump and enqueued into the in-memory buffer @@ -2254,8 +2253,7 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() { s.NoError(err) } - tlMgr, ok := s.matchingEngine.partitions[dbq.Partition().Key()].(*taskQueuePartitionManagerImpl).defaultQueue.(*physicalTaskQueueManagerImpl) - s.True(ok, "failed to load task queue") + tlMgr := s.getPhysicalTaskQueueManagerImpl(dbq) s.EqualValues(taskCount, s.taskManager.getTaskCount(dbq)) blm := tlMgr.backlogMgr.(*backlogManagerImpl) @@ -2988,7 +2986,9 @@ func (s *matchingEngineSuite) TestUpdatePhysicalTaskQueueGauge_UnVersioned() { // the size of the map to 1 and it's counter to 1. s.PhysicalQueueMetricValidator(capture, 1, 1) - tlmImpl, ok := tqm.(*taskQueuePartitionManagerImpl).defaultQueue.(*physicalTaskQueueManagerImpl) + defaultQ, err := tqm.(*taskQueuePartitionManagerImpl).defaultQueueFuture.Get(context.Background()) + s.Require().NoError(err) + tlmImpl := defaultQ.(*physicalTaskQueueManagerImpl) s.True(ok) s.matchingEngine.updatePhysicalTaskQueueGauge(s.ns, prtn, tlmImpl.queue.version, 1) @@ -3223,8 +3223,9 @@ func (s *matchingEngineSuite) pollWorkflowTasksConcurrently( // getPhysicalTaskQueueManagerImpl extracts the physicalTaskQueueManagerImpl for the given PhysicalTaskQueueKey func (s *matchingEngineSuite) getPhysicalTaskQueueManagerImpl(ptq *PhysicalTaskQueueKey) *physicalTaskQueueManagerImpl { - pgMgr, ok := s.matchingEngine.partitions[ptq.Partition().Key()].(*taskQueuePartitionManagerImpl).defaultQueue.(*physicalTaskQueueManagerImpl) - s.True(ok, "taskQueueManger doesn't implement taskQueuePartitionManager interface") + defaultQ, err := s.matchingEngine.partitions[ptq.Partition().Key()].(*taskQueuePartitionManagerImpl).defaultQueueFuture.Get(context.Background()) + s.Require().NoError(err) + pgMgr := defaultQ.(*physicalTaskQueueManagerImpl) return pgMgr } diff --git a/service/matching/physical_task_queue_manager_test.go b/service/matching/physical_task_queue_manager_test.go index c45284e5af9..a26f2674a2f 100644 --- a/service/matching/physical_task_queue_manager_test.go +++ b/service/matching/physical_task_queue_manager_test.go @@ -91,9 +91,16 @@ func (s *PhysicalTaskQueueManagerTestSuite) SetupTest() { s.NoError(err) engine.partitions[prtn.Key()] = prtnMgr + if s.fairness { + prtnMgr.config.NewMatcher = true + prtnMgr.config.EnableFairness = true + } else if s.newMatcher { + prtnMgr.config.NewMatcher = true + } + s.tqMgr, err = newPhysicalTaskQueueManager(prtnMgr, s.physicalTaskQueueKey) s.NoError(err) - prtnMgr.defaultQueue = s.tqMgr + prtnMgr.defaultQueueFuture.Set(s.tqMgr, nil) } /* @@ -334,13 +341,17 @@ func (s *PhysicalTaskQueueManagerTestSuite) TestAddTaskStandby() { s.tqMgr.namespaceRegistry = mockNamespaceCache s.tqMgr.Start() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + err := s.tqMgr.WaitUntilInitialized(ctx) + s.Require().NoError(err) defer s.tqMgr.Stop(unloadCauseShuttingDown) + cancel() // stop taskWriter so that we can check if there's any call to it // otherwise the task persist process is async and hard to test s.tqMgr.tqCtxCancel() - err := s.tqMgr.SpoolTask(&persistencespb.TaskInfo{ + err = s.tqMgr.SpoolTask(&persistencespb.TaskInfo{ CreateTime: timestamp.TimePtr(time.Now().UTC()), }) s.Equal(errShutdown, err) // task writer was stopped above diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 2f5e56fc587..14dc2619cda 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -15,13 +15,16 @@ import ( "go.temporal.io/server/api/matchingservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" taskqueuespb "go.temporal.io/server/api/taskqueue/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/cache" + "go.temporal.io/server/common/future" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" serviceerrors "go.temporal.io/server/common/serviceerror" + "go.temporal.io/server/common/softassert" "go.temporal.io/server/common/tqid" "go.temporal.io/server/common/worker_versioning" "google.golang.org/protobuf/types/known/timestamppb" @@ -49,9 +52,6 @@ type ( partition tqid.Partition ns *namespace.Namespace config *taskQueueConfig - // this is the default (unversioned) DB queue. As of now, some of the matters related to the whole TQ partition - // is delegated to the defaultQueue. - defaultQueue physicalTaskQueueManager // used for non-sticky versioned queues (one for each version) versionedQueues map[PhysicalTaskQueueVersion]physicalTaskQueueManager versionedQueuesLock sync.RWMutex // locks mutation of versionedQueues @@ -61,7 +61,13 @@ type ( matchingClient matchingservice.MatchingServiceClient metricsHandler metrics.Handler // namespace/taskqueue tagged metric scope // TODO(stephanos): move cache out of partition manager - cache cache.Cache // non-nil for root-partition + cache cache.Cache // non-nil for root-partition + autoEnable bool + + fairnessState persistencespb.TaskQueueTypeUserData_FairnessState // Set once on initialization and read only after + defaultQueueFuture *future.FutureImpl[physicalTaskQueueManager] + initCtx context.Context + initCancel func() cancelNewMatcherSub func() cancelFairnessSub func() @@ -96,18 +102,20 @@ func newTaskQueuePartitionManager( tqConfig, partition.TaskQueue().TaskType()) pm := &taskQueuePartitionManagerImpl{ - engine: e, - partition: partition, - ns: ns, - config: tqConfig, - logger: logger, - throttledLogger: throttledLogger, - matchingClient: e.matchingRawClient, - metricsHandler: metricsHandler, - versionedQueues: make(map[PhysicalTaskQueueVersion]physicalTaskQueueManager), - userDataManager: userDataManager, - rateLimitManager: rateLimitManager, - } + engine: e, + partition: partition, + ns: ns, + config: tqConfig, + logger: logger, + throttledLogger: throttledLogger, + matchingClient: e.matchingRawClient, + metricsHandler: metricsHandler, + versionedQueues: make(map[PhysicalTaskQueueVersion]physicalTaskQueueManager), + userDataManager: userDataManager, + rateLimitManager: rateLimitManager, + defaultQueueFuture: future.NewFuture[physicalTaskQueueManager](), + } + pm.initCtx, pm.initCancel = context.WithCancel(context.Background()) if pm.partition.IsRoot() { pm.cache = cache.New(10000, &cache.Options{ @@ -115,32 +123,78 @@ func newTaskQueuePartitionManager( ) } + return pm, nil +} + +var errDefaultQueueNotInit = serviceerror.NewInternal("defaultQueue is not initializaed") + +func (pm *taskQueuePartitionManagerImpl) initialize() { unload := func(bool) { pm.unloadFromEngine(unloadCauseConfigChange) } - var fairness bool - fairness, pm.cancelFairnessSub = tqConfig.EnableFairnessSub(unload) - // Fairness is disabled for sticky queues for now so that we can still use TTLs. - tqConfig.EnableFairness = fairness && partition.Kind() != enumspb.TASK_QUEUE_KIND_STICKY - if fairness { - tqConfig.NewMatcher = true - } else { - tqConfig.NewMatcher, pm.cancelNewMatcherSub = tqConfig.NewMatcherSub(unload) + err := pm.userDataManager.WaitUntilInitialized(pm.initCtx) + if err != nil { + pm.defaultQueueFuture.Set(nil, err) + return + } + data, _, err := pm.getPerTypeUserData() + if err != nil { + pm.defaultQueueFuture.Set(nil, err) + return } - defaultQ, err := newPhysicalTaskQueueManager(pm, UnversionedQueueKey(partition)) + pm.autoEnable = pm.config.AutoEnable() + pm.fairnessState = data.GetFairnessState() + switch { + case !pm.autoEnable || pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED: + var fairness bool + fairness, pm.cancelFairnessSub = pm.config.EnableFairnessSub(unload) + // Fairness is disabled for sticky queues for now so that we can still use TTLs. + pm.config.EnableFairness = fairness && pm.partition.Kind() != enumspb.TASK_QUEUE_KIND_STICKY + if fairness { + pm.config.NewMatcher = true + } else { + pm.config.NewMatcher, pm.cancelNewMatcherSub = pm.config.NewMatcherSub(unload) + } + case pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V1: + pm.config.NewMatcher = true + pm.config.EnableFairness = false + case pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V2: + pm.config.NewMatcher = true + if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY { + pm.config.EnableFairness = false + } else { + pm.config.EnableFairness = true + } + default: + pm.defaultQueueFuture.Set(nil, serviceerror.NewInternal("Unknown FairnessState in UserData")) + return + } + + defaultQ, err := newPhysicalTaskQueueManager(pm, UnversionedQueueKey(pm.partition)) if err != nil { - return nil, err + pm.defaultQueueFuture.Set(nil, err) + return } - pm.defaultQueue = defaultQ - return pm, nil + defaultQ.Start() + pm.defaultQueueFuture.Set(defaultQ, nil) +} + +func (pm *taskQueuePartitionManagerImpl) defaultQueue() physicalTaskQueueManager { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + queue, err := pm.defaultQueueFuture.Get(ctx) + if err != nil { + softassert.Fail(pm.logger, "defaultQueue used but not initialized or initialization failed", tag.Error(err)) + } + return queue } func (pm *taskQueuePartitionManagerImpl) Start() { pm.engine.updateTaskQueuePartitionGauge(pm.Namespace(), pm.partition, 1) pm.userDataManager.Start() - pm.defaultQueue.Start() + go pm.initialize() } func (pm *taskQueuePartitionManagerImpl) GetRateLimitManager() *rateLimitManager { @@ -150,8 +204,11 @@ func (pm *taskQueuePartitionManagerImpl) GetRateLimitManager() *rateLimitManager // Stop does not unload the partition from matching engine. It is intended to be called by matching engine when // unloading the partition. For stopping and unloading a partition call unloadFromEngine instead. func (pm *taskQueuePartitionManagerImpl) Stop(unloadCause unloadCause) { - pm.versionedQueuesLock.Lock() - defer pm.versionedQueuesLock.Unlock() + pm.initCancel() + queue, err := pm.defaultQueueFuture.Get(context.Background()) + if err == nil { + queue.Stop(unloadCause) + } if pm.cancelFairnessSub != nil { pm.cancelFairnessSub() @@ -160,11 +217,12 @@ func (pm *taskQueuePartitionManagerImpl) Stop(unloadCause unloadCause) { pm.cancelNewMatcherSub() } + pm.versionedQueuesLock.Lock() // First, stop all queues to wrap up ongoing operations. for _, vq := range pm.versionedQueues { vq.Stop(unloadCause) } - pm.defaultQueue.Stop(unloadCause) + pm.versionedQueuesLock.Unlock() // Then, stop user data manager to wrap up any reads/writes. pm.userDataManager.Stop() @@ -180,15 +238,18 @@ func (pm *taskQueuePartitionManagerImpl) Namespace() *namespace.Namespace { } func (pm *taskQueuePartitionManagerImpl) MarkAlive() { - pm.defaultQueue.MarkAlive() + dbq := pm.defaultQueue() + if dbq != nil { + dbq.MarkAlive() + } } func (pm *taskQueuePartitionManagerImpl) WaitUntilInitialized(ctx context.Context) error { - err := pm.userDataManager.WaitUntilInitialized(ctx) + queue, err := pm.defaultQueueFuture.Get(ctx) if err != nil { return err } - return pm.defaultQueue.WaitUntilInitialized(ctx) + return queue.WaitUntilInitialized(ctx) } func (pm *taskQueuePartitionManagerImpl) AddTask( @@ -197,6 +258,21 @@ func (pm *taskQueuePartitionManagerImpl) AddTask( ) (buildId string, syncMatched bool, err error) { var spoolQueue, syncMatchQueue physicalTaskQueueManager directive := params.taskInfo.GetVersionDirective() + + if pm.Partition().IsRoot() && pm.autoEnable && pm.fairnessState == persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED { + if params.taskInfo.Priority.GetFairnessKey() != "" || params.taskInfo.Priority.GetPriorityKey() != int32(0) { + updateFn := func(old *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) { + data := common.CloneProto(old) + perType := data.GetPerType()[int32(pm.Partition().TaskType())] + perType.FairnessState = persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V2 + return data, true, nil + } + _, err := pm.userDataManager.UpdateUserData(ctx, UserDataUpdateOptions{Source: "Matching auto enable"}, updateFn) + if err != nil { + pm.logger.Error("could not update userdata for autoenable", tag.Error(err)) + } + } + } // spoolQueue will be nil iff task is forwarded. reredirectTask: spoolQueue, syncMatchQueue, _, taskDispatchRevisionNumber, err := pm.getPhysicalQueuesForAdd(ctx, directive, params.forwardInfo, params.taskInfo.GetRunId(), params.taskInfo.GetWorkflowId(), false) @@ -214,9 +290,13 @@ reredirectTask: } } - if pm.defaultQueue != syncMatchQueue { + dbq := pm.defaultQueue() + if dbq == nil { + return "", false, errDefaultQueueNotInit + } + if dbq != syncMatchQueue { // default queue should stay alive even if requests go to other queues - pm.defaultQueue.MarkAlive() + dbq.MarkAlive() } if pm.partition.IsRoot() && !pm.HasAnyPollerAfter(time.Now().Add(-noPollerThreshold)) { @@ -284,7 +364,10 @@ func (pm *taskQueuePartitionManagerImpl) PollTask( pollMetadata *pollMetadata, ) (*internalTask, bool, error) { var err error - dbq := pm.defaultQueue + dbq := pm.defaultQueue() + if dbq == nil { + return nil, false, errDefaultQueueNotInit + } versionSetUsed := false deployment, err := worker_versioning.DeploymentFromCapabilities(pollMetadata.workerVersionCapabilities, pollMetadata.deploymentOptions) if err != nil { @@ -296,7 +379,7 @@ func (pm *taskQueuePartitionManagerImpl) PollTask( // TODO: reject poller of old sticky queue if newer version exist } else { // default queue should stay alive even if requests go to other queues - pm.defaultQueue.MarkAlive() + dbq.MarkAlive() dbq, err = pm.getVersionedQueue(ctx, "", "", deployment, true) if err != nil { @@ -338,7 +421,7 @@ func (pm *taskQueuePartitionManagerImpl) PollTask( versionSetUsed = true } else { // default queue should stay alive even if requests go to other queues - pm.defaultQueue.MarkAlive() + dbq.MarkAlive() var versionSet string if versioningData.GetVersionSets() != nil { @@ -536,9 +619,13 @@ reredirectTask: return nil, err } - if pm.defaultQueue != syncMatchQueue { + dbq := pm.defaultQueue() + if dbq == nil { + return nil, errDefaultQueueNotInit + } + if dbq != syncMatchQueue { // default queue should stay alive even if requests go to other queues - pm.defaultQueue.MarkAlive() + dbq.MarkAlive() } res, err := syncMatchQueue.DispatchQueryTask(ctx, taskID, request) @@ -570,9 +657,13 @@ reredirectTask: return nil, err } - if pm.defaultQueue != syncMatchQueue { + dbq := pm.defaultQueue() + if dbq == nil { + return nil, errDefaultQueueNotInit + } + if dbq != syncMatchQueue { // default queue should stay alive even if requests go to other queues - pm.defaultQueue.MarkAlive() + dbq.MarkAlive() } res, err := syncMatchQueue.DispatchNexusTask(ctx, taskId, request) @@ -593,7 +684,11 @@ func (pm *taskQueuePartitionManagerImpl) GetConfig() *taskQueueConfig { // GetAllPollerInfo returns all pollers that polled from this taskqueue in last few minutes func (pm *taskQueuePartitionManagerImpl) GetAllPollerInfo() []*taskqueuepb.PollerInfo { - ret := pm.defaultQueue.GetAllPollerInfo() + dbq := pm.defaultQueue() + var ret []*taskqueuepb.PollerInfo + if dbq != nil { + ret = dbq.GetAllPollerInfo() + } pm.versionedQueuesLock.RLock() defer pm.versionedQueuesLock.RUnlock() for _, vq := range pm.versionedQueues { @@ -610,7 +705,8 @@ func (pm *taskQueuePartitionManagerImpl) GetAllPollerInfo() []*taskqueuepb.Polle } func (pm *taskQueuePartitionManagerImpl) HasAnyPollerAfter(accessTime time.Time) bool { - if pm.defaultQueue.HasPollerAfter(accessTime) { + dbq := pm.defaultQueue() + if dbq != nil && dbq.HasPollerAfter(accessTime) { return true } pm.versionedQueuesLock.RLock() @@ -625,7 +721,11 @@ func (pm *taskQueuePartitionManagerImpl) HasAnyPollerAfter(accessTime time.Time) func (pm *taskQueuePartitionManagerImpl) HasPollerAfter(buildId string, accessTime time.Time) bool { if buildId == "" { - return pm.defaultQueue.HasPollerAfter(accessTime) + dbq := pm.defaultQueue() + if dbq != nil && dbq.HasPollerAfter(accessTime) { + return true + } + return false } pm.versionedQueuesLock.RLock() // TODO: support v3 versioning @@ -643,8 +743,13 @@ func (pm *taskQueuePartitionManagerImpl) LegacyDescribeTaskQueue(includeTaskQueu Pollers: pm.GetAllPollerInfo(), }, } + dbq := pm.defaultQueue() + if dbq == nil { + return nil, errDefaultQueueNotInit + } if includeTaskQueueStatus { - resp.DescResponse.TaskQueueStatus = pm.defaultQueue.LegacyDescribeTaskQueue(true).DescResponse.TaskQueueStatus + //nolint:staticcheck // SA1019: [cleanup-wv-3.1] + resp.DescResponse.TaskQueueStatus = dbq.LegacyDescribeTaskQueue(true).DescResponse.TaskQueueStatus } if pm.partition.Kind() != enumspb.TASK_QUEUE_KIND_STICKY { perTypeUserData, _, err := pm.getPerTypeUserData() @@ -698,7 +803,11 @@ func (pm *taskQueuePartitionManagerImpl) Describe( for b := range buildIds { if b == "" { - versions[pm.defaultQueue.QueueKey().Version()] = true + dbq := pm.defaultQueue() + if dbq == nil { + return nil, errDefaultQueueNotInit + } + versions[dbq.QueueKey().Version()] = true } else { found := false for k := range pm.versionedQueues { @@ -840,8 +949,9 @@ func (pm *taskQueuePartitionManagerImpl) unloadPhysicalQueue(unloadedDbq physica version := unloadedDbq.QueueKey().Version() if !version.IsVersioned() { + dbq := pm.defaultQueue() // this is the default queue, unload the whole partition if it is not healthy - if pm.defaultQueue == unloadedDbq { + if dbq != nil && dbq == unloadedDbq { pm.unloadFromEngine(unloadCause) } return @@ -865,7 +975,11 @@ func (pm *taskQueuePartitionManagerImpl) unloadFromEngine(unloadCause unloadCaus func (pm *taskQueuePartitionManagerImpl) getPhysicalQueue(ctx context.Context, buildId string, deployment *deploymentpb.Deployment) (physicalTaskQueueManager, error) { if buildId == "" { - return pm.defaultQueue, nil + dbq := pm.defaultQueue() + if dbq == nil { + return nil, errDefaultQueueNotInit + } + return dbq, nil } return pm.getVersionedQueue(ctx, "", buildId, deployment, true) @@ -1023,10 +1137,15 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd( deploymentData := perTypeUserData.GetDeploymentData() taskDirectiveRevisionNumber := directive.GetRevisionNumber() + dbq := pm.defaultQueue() + if dbq == nil { + return nil, nil, nil, 0, errDefaultQueueNotInit + } + if wfBehavior == enumspb.VERSIONING_BEHAVIOR_PINNED { if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY { // TODO (shahab): we can verify the passed deployment matches the last poller's deployment - return pm.defaultQueue, pm.defaultQueue, userDataChanged, 0, nil + return dbq, dbq, userDataChanged, 0, nil } err = worker_versioning.ValidateDeployment(deployment) @@ -1083,7 +1202,7 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd( } // TODO (shahab): we can verify the passed deployment matches the last poller's deployment - return pm.defaultQueue, pm.defaultQueue, userDataChanged, 0, nil + return dbq, dbq, userDataChanged, 0, nil } var err error @@ -1094,7 +1213,7 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd( if forwardInfo == nil { // Task is not forwarded, so it can be spooled if sync match fails. // Unpinned tasks are spooled in default queue - return pm.defaultQueue, targetDeploymentQueue, userDataChanged, taskDispatchRevisionNumber, err + return dbq, targetDeploymentQueue, userDataChanged, taskDispatchRevisionNumber, err } else { // Forwarded from child partition - only do sync match. return nil, targetDeploymentQueue, userDataChanged, taskDispatchRevisionNumber, err @@ -1105,7 +1224,7 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd( // Forwarded from child partition - only do sync match. // No need to calculate build ID, just dispatch based on source partition's instructions. if forwardInfo.DispatchVersionSet == "" && forwardInfo.DispatchBuildId == "" { - syncMatchQueue = pm.defaultQueue + syncMatchQueue = dbq } else { syncMatchQueue, err = pm.getVersionedQueue( ctx, @@ -1121,7 +1240,7 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd( if directive.GetBuildId() == nil { // The task belongs to an unversioned execution. Keep using unversioned. But also return // userDataChanged so if current deployment is set, the task redirects to that deployment. - return pm.defaultQueue, pm.defaultQueue, userDataChanged, taskDispatchRevisionNumber, nil + return dbq, dbq, userDataChanged, taskDispatchRevisionNumber, nil } userData, userDataChanged, err := pm.userDataManager.GetUserData() @@ -1180,11 +1299,14 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd( return nil, nil, nil, 0, serviceerrors.NewStickyWorkerUnavailable() } // sticky queues only use default queue - return pm.defaultQueue, pm.defaultQueue, userDataChanged, 0, nil + return dbq, dbq, userDataChanged, 0, nil } if versionSet != "" { - spoolQueue = pm.defaultQueue + spoolQueue = pm.defaultQueue() + if spoolQueue == nil { + return nil, nil, nil, 0, errDefaultQueueNotInit + } syncMatchQueue, err = pm.getVersionedQueue(ctx, versionSet, "", nil, true) if err != nil { return nil, nil, nil, 0, err @@ -1303,10 +1425,24 @@ func (pm *taskQueuePartitionManagerImpl) getPerTypeUserData() (*persistencespb.T return perType, userDataChanged, nil } -func (pm *taskQueuePartitionManagerImpl) userDataChanged() { +func (pm *taskQueuePartitionManagerImpl) userDataChanged(to *persistencespb.VersionedTaskQueueUserData) { // Update rateLimits if any change is userData. pm.rateLimitManager.UserDataChanged() + ctx, cancel := context.WithCancel(context.Background()) + cancel() + defaultQ, err := pm.defaultQueueFuture.Get(ctx) + // Initialization error or not ready yet + if err != nil { + return + } + + taskType := int32(pm.Partition().TaskType()) + if to.GetData().GetPerType()[taskType].GetFairnessState() != pm.fairnessState { + pm.unloadFromEngine(unloadCauseConfigChange) + return + } + // Notify all queues so they can re-evaluate their backlog. pm.versionedQueuesLock.RLock() for _, vq := range pm.versionedQueues { @@ -1315,5 +1451,5 @@ func (pm *taskQueuePartitionManagerImpl) userDataChanged() { pm.versionedQueuesLock.RUnlock() // Do this one in this goroutine. - pm.defaultQueue.UserDataChanged() + defaultQ.UserDataChanged() } diff --git a/service/matching/user_data_manager.go b/service/matching/user_data_manager.go index baf833c0e2b..e032a0d5897 100644 --- a/service/matching/user_data_manager.go +++ b/service/matching/user_data_manager.go @@ -63,7 +63,8 @@ type ( // UserDataUpdateFunc accepts the current user data for a task queue and returns the updated user data, a boolean // indicating whether this data should be replicated, and an error. // Extra care should be taken to avoid mutating the current user data to avoid keeping uncommitted data in memory. - UserDataUpdateFunc func(*persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) + UserDataUpdateFunc func(*persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) + UserDataOnChangeFunc func(to *persistencespb.VersionedTaskQueueUserData) // userDataManager is responsible for fetching and keeping user data up-to-date in-memory // for a given TQ partition. @@ -75,7 +76,7 @@ type ( userDataManagerImpl struct { lock sync.Mutex onFatalErr func(unloadCause) - onUserDataChanged func() // if set, call this in new goroutine when user data changes + onUserDataChanged UserDataOnChangeFunc // if set, call this in new goroutine when user data changes partition tqid.Partition userData *persistencespb.VersionedTaskQueueUserData userDataChanged chan struct{} @@ -110,7 +111,7 @@ func newUserDataManager( store persistence.TaskManager, matchingClient matchingservice.MatchingServiceClient, onFatalErr func(unloadCause), - onUserDataChanged func(), + onUserDataChanged UserDataOnChangeFunc, partition tqid.Partition, config *taskQueueConfig, logger log.Logger, @@ -180,7 +181,7 @@ func (m *userDataManagerImpl) setUserDataLocked(userData *persistencespb.Version close(m.userDataChanged) m.userDataChanged = make(chan struct{}) if m.onUserDataChanged != nil { - go m.onUserDataChanged() + go m.onUserDataChanged(m.userData) } }