6464 // TODO(stephanos): move cache out of partition manager
6565 cache cache.Cache // non-nil for root-partition
6666
67- fairnessState persistencespb.TaskQueueTypeUserData_FairnessState //Set once on initialization and read only after
67+ fairnessState persistencespb.TaskQueueTypeUserData_FairnessState // Set once on initialization and read only after
6868
6969 cancelNewMatcherSub func ()
7070 cancelFairnessSub func ()
@@ -129,7 +129,7 @@ func newTaskQueuePartitionManager(
129129 if err != nil {
130130 return nil , err
131131 }
132- //todo(moody): do we need to be more cautious loading this? Do we need to ensure that we have PerType()? Check for a nil return?
132+ // todo(moody): do we need to be more cautious loading this? Do we need to ensure that we have PerType()? Check for a nil return?
133133 data , _ , err := pm .getPerTypeUserData ()
134134 if err != nil {
135135 return nil , err
@@ -138,7 +138,7 @@ func newTaskQueuePartitionManager(
138138 tqConfig .AutoEnable , pm .cancelAutoEnableSub = tqConfig .AutoEnableSub (unload )
139139 pm .fairnessState = data .GetFairnessState ()
140140 switch {
141- case tqConfig .AutoEnable == false || pm .fairnessState == persistencespb .TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED :
141+ case ! tqConfig .AutoEnable || pm .fairnessState == persistencespb .TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED :
142142 var fairness bool
143143 fairness , pm .cancelFairnessSub = tqConfig .EnableFairnessSub (unload )
144144 // Fairness is disabled for sticky queues for now so that we can still use TTLs.
@@ -158,6 +158,8 @@ func newTaskQueuePartitionManager(
158158 } else {
159159 tqConfig .EnableFairness = true
160160 }
161+ default :
162+ return nil , serviceerror .NewInternal ("Unknown FairnessState in UserData" )
161163 }
162164
163165 defaultQ , err := newPhysicalTaskQueueManager (pm , UnversionedQueueKey (partition ))
@@ -228,17 +230,20 @@ func (pm *taskQueuePartitionManagerImpl) AddTask(
228230 directive := params .taskInfo .GetVersionDirective ()
229231
230232 if pm .Partition ().IsRoot () && pm .config .AutoEnable && pm .fairnessState == persistencespb .TaskQueueTypeUserData_FAIRNESS_STATE_UNSPECIFIED {
231- //TODO(moody): unsure about this check, what is the correct way to check to see if PriorityKey is set?
232- //TODO(moody): This was originally discussed to be a separate API, but is just exposing this through the generic UpdateUserData sufficient?
233+ // TODO(moody): unsure about this check, what is the correct way to check to see if PriorityKey is set?
234+ // TODO(moody): This was originally discussed to be a separate API, but is just exposing this through the generic UpdateUserData sufficient?
233235 // what is the pro/cons of adding the new API and perhaps invoking that instead? We're going to unload either way...
234236 if params .taskInfo .Priority != nil && (params .taskInfo .Priority .FairnessKey != "" || params .taskInfo .Priority .PriorityKey != int32 (pm .config .DefaultPriorityKey )) {
235237 updateFn := func (old * persistencespb.TaskQueueUserData ) (* persistencespb.TaskQueueUserData , bool , error ) {
236- new := common .CloneProto (old )
237- perType := new .GetPerType ()[int32 (pm .Partition ().TaskType ())]
238+ data := common .CloneProto (old )
239+ perType := data .GetPerType ()[int32 (pm .Partition ().TaskType ())]
238240 perType .FairnessState = persistencespb .TaskQueueTypeUserData_FAIRNESS_STATE_V2
239- return new , true , nil
241+ return data , true , nil
242+ }
243+ _ , err := pm .userDataManager .UpdateUserData (ctx , UserDataUpdateOptions {Source : "Matching auto enable" }, updateFn )
244+ if err != nil {
245+ pm .logger .Error ("could not update userdata for autoenable: " + err .Error ())
240246 }
241- pm .userDataManager .UpdateUserData (ctx , UserDataUpdateOptions {Source : "Matching auto enable" }, updateFn )
242247 }
243248 }
244249 // spoolQueue will be nil iff task is forwarded.
@@ -1347,15 +1352,17 @@ func (pm *taskQueuePartitionManagerImpl) getPerTypeUserData() (*persistencespb.T
13471352 return perType , userDataChanged , nil
13481353}
13491354
1350- func (pm * taskQueuePartitionManagerImpl ) userDataChanged (old , new * persistencespb.VersionedTaskQueueUserData ) {
1351- taskType := int32 (pm .Partition ().TaskType ())
1352- //TODO(moody): this stinks, do we need this more verbose, is this interface bad?
1353- //TODO(moody): we get calls into this callback quite a bit with data being nil(sampled from unit tests), do we want to go through the full update
1355+ func (pm * taskQueuePartitionManagerImpl ) userDataChanged (from , to * persistencespb.VersionedTaskQueueUserData ) {
1356+ // TODO(moody): this stinks, do we need this more verbose, is this interface bad?
1357+ // TODO(moody): we get calls into this callback quite a bit with data being nil(sampled from unit tests), do we want to go through the full update
13541358 // even in those cases? Should we pass the inner data and avoid a callback when that is nil? I am not totally sure about the implcations....
1355- if old != nil && old .GetData () != nil && old .GetData ().GetPerType () != nil && new != nil && new .GetData () != nil && new .GetData ().GetPerType () != nil {
1356- if old .GetData ().GetPerType ()[taskType ].FairnessState != new .GetData ().GetPerType ()[taskType ].FairnessState {
1357- pm .unloadFromEngine (unloadCauseConfigChange )
1358- return
1359+ if from != nil && from .GetData () != nil && from .GetData ().GetPerType () != nil && to != nil && to .GetData () != nil && to .GetData ().GetPerType () != nil {
1360+ taskType := int32 (pm .Partition ().TaskType ())
1361+ if from .GetData ().GetPerType ()[taskType ] != nil && to .GetData ().GetPerType ()[taskType ] != nil {
1362+ if from .GetData ().GetPerType ()[taskType ].FairnessState != to .GetData ().GetPerType ()[taskType ].FairnessState {
1363+ pm .unloadFromEngine (unloadCauseConfigChange )
1364+ return
1365+ }
13591366 }
13601367 }
13611368 // Update rateLimits if any change is userData.
0 commit comments