Skip to content

Commit 9193a90

Browse files
authored
Disable user data when fetch fails (#4604)
1 parent 2fd6123 commit 9193a90

File tree

7 files changed

+516
-93
lines changed

7 files changed

+516
-93
lines changed

service/matching/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package matching
2727
import (
2828
"time"
2929

30+
"go.temporal.io/server/common/backoff"
3031
"go.temporal.io/server/common/dynamicconfig"
3132
"go.temporal.io/server/common/namespace"
3233
"go.temporal.io/server/common/persistence/visibility"
@@ -126,6 +127,9 @@ type (
126127
// root. When disbled, features that rely on user data (e.g. worker versioning) will essentially be disabled.
127128
// See the documentation for constants.MatchingLoadUserData for the implications on versioning.
128129
LoadUserData func() bool
130+
131+
// Retry policy for fetching user data from root partition. Should retry forever.
132+
GetUserDataRetryPolicy backoff.RetryPolicy
129133
}
130134
)
131135

@@ -257,5 +261,6 @@ func newTaskQueueConfig(id *taskQueueID, config *Config, namespace namespace.Nam
257261
return util.Max(1, config.ForwarderMaxChildrenPerNode(namespace.String(), taskQueueName, taskType))
258262
},
259263
},
264+
GetUserDataRetryPolicy: backoff.NewExponentialRetryPolicy(1 * time.Second).WithMaximumInterval(5 * time.Minute),
260265
}
261266
}

service/matching/db.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ import (
4646
const (
4747
initialRangeID = 1 // Id of the first range of a new task queue
4848
stickyTaskQueueTTL = 24 * time.Hour
49+
50+
// userDataEnabled is the default state: user data is enabled.
51+
userDataEnabled userDataState = iota
52+
// userDataDisabled means user data is disabled due to the LoadUserData dynamic config
53+
// being turned off on this node or the parent node. This should cause GetUserData to
54+
// return a FailedPrecondition error.
55+
userDataDisabled
56+
// userDataSpecificVersion means this tqm/db is for a specific version set, which doesn't
57+
// have its own user data and it should not be used. This should cause GetUserData to
58+
// return an Internal error (access would indicate a bug).
59+
userDataSpecificVersion
4960
)
5061

5162
type (
@@ -58,6 +69,7 @@ type (
5869
ackLevel int64
5970
userData *persistencespb.VersionedTaskQueueUserData
6071
userDataChanged chan struct{}
72+
userDataState userDataState
6173
store persistence.TaskManager
6274
logger log.Logger
6375
matchingClient matchingservice.MatchingServiceClient
@@ -66,6 +78,8 @@ type (
6678
rangeID int64
6779
ackLevel int64
6880
}
81+
82+
userDataState int
6983
)
7084

7185
var (
@@ -309,7 +323,21 @@ func (db *taskQueueDB) GetUserData(
309323
) (*persistencespb.VersionedTaskQueueUserData, chan struct{}, error) {
310324
db.Lock()
311325
defer db.Unlock()
312-
return db.userData, db.userDataChanged, nil
326+
return db.getUserDataLocked()
327+
}
328+
329+
func (db *taskQueueDB) getUserDataLocked() (*persistencespb.VersionedTaskQueueUserData, chan struct{}, error) {
330+
switch db.userDataState {
331+
case userDataEnabled:
332+
return db.userData, db.userDataChanged, nil
333+
case userDataDisabled:
334+
return nil, nil, errUserDataDisabled
335+
case userDataSpecificVersion:
336+
return nil, nil, errNoUserDataOnVersionedTQM
337+
default:
338+
// shouldn't happen
339+
return nil, nil, serviceerror.NewInternal("unexpected user data enabled state")
340+
}
313341
}
314342

315343
func (db *taskQueueDB) setUserDataLocked(userData *persistencespb.VersionedTaskQueueUserData) {
@@ -343,6 +371,12 @@ func (db *taskQueueDB) loadUserData(ctx context.Context) error {
343371
return nil
344372
}
345373

374+
func (db *taskQueueDB) setUserDataState(setUserDataState userDataState) {
375+
db.Lock()
376+
defer db.Unlock()
377+
db.userDataState = setUserDataState
378+
}
379+
346380
// UpdateUserData allows callers to update user data (such as worker build IDs) for this task queue. The pointer passed
347381
// to the update function is guaranteed to be non-nil.
348382
// Note that the user data's clock may be nil and should be initialized externally where there's access to the cluster
@@ -363,11 +397,17 @@ func (db *taskQueueDB) UpdateUserData(
363397
if !db.DbStoresUserData() {
364398
return nil, false, errUserDataNoMutateNonRoot
365399
}
400+
366401
db.Lock()
367402
defer db.Unlock()
368403

369-
preUpdateData := db.userData.GetData()
370-
preUpdateVersion := db.userData.GetVersion()
404+
userData, _, err := db.getUserDataLocked()
405+
if err != nil {
406+
return nil, false, err
407+
}
408+
409+
preUpdateData := userData.GetData()
410+
preUpdateVersion := userData.GetVersion()
371411
if preUpdateData == nil {
372412
preUpdateData = &persistencespb.TaskQueueUserData{}
373413
}

service/matching/matching_engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1014,7 +1014,7 @@ func (e *matchingEngineImpl) GetTaskQueueUserData(
10141014
} else if userData.Version < version {
10151015
// This is highly unlikely but may happen due to an edge case in during ownership transfer.
10161016
// We rely on client retries in this case to let the system eventually self-heal.
1017-
return nil, serviceerror.NewFailedPrecondition(
1017+
return nil, serviceerror.NewInvalidArgument(
10181018
"requested task queue user data for version greater than known version")
10191019
}
10201020
}

0 commit comments

Comments
 (0)