Skip to content

Commit c0fb121

Browse files
authored
Consider non default sets reachable by new workflows for a while after they stop being queue default (#4545)
1 parent 87cc6f4 commit c0fb121

File tree

10 files changed

+163
-90
lines changed

10 files changed

+163
-90
lines changed

common/clock/hybrid_logical_clock/hybrid_logical_clock.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ func Max(a Clock, b Clock) Clock {
9797
return a
9898
}
9999

100+
// Min returns the minimum of two clocks
101+
func Min(a Clock, b Clock) Clock {
102+
if Compare(a, b) > 0 {
103+
return a
104+
}
105+
return b
106+
}
107+
100108
// Equal returns whether two clocks are equal
101109
func Equal(a Clock, b Clock) bool {
102110
return Compare(a, b) == 0

common/clock/hybrid_logical_clock/hybrid_logical_clock_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,18 @@ func Test_Max_ReturnsMaximum(t *testing.T) {
8787
assert.Equal(t, max, t1)
8888
}
8989

90+
func Test_Min_ReturnsMinimum(t *testing.T) {
91+
t.Parallel()
92+
t0 := Zero(1)
93+
t1 := Zero(2)
94+
95+
min := Min(t0, t1)
96+
assert.Equal(t, min, t0)
97+
// Just in case it doesn't work in reverse order...
98+
min = Min(t1, t0)
99+
assert.Equal(t, min, t0)
100+
}
101+
90102
func Test_UTC_ReturnsTimeInUTC(t *testing.T) {
91103
t.Parallel()
92104
assert.Equal(t, time.Unix(0, 0).UTC(), UTC(Zero(0)))

common/dynamicconfig/constants.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,24 @@ const (
198198
// ReachabilityQueryBuildIdLimit limits the number of build ids that can be requested in a single call to the
199199
// GetWorkerTaskReachability API.
200200
ReachabilityQueryBuildIdLimit = "limit.reachabilityQueryBuildIds"
201+
// ReachabilityQuerySetDurationSinceDefault is the minimum period since a version set was demoted from being the
202+
// queue default before it is considered unreachable by new workflows.
203+
// This setting allows some propogation delay of versioning data for the reachability queries, which may happen for
204+
// the following reasons:
205+
// 1. There are no workflows currently marked as open in the visibility store but a worker for the demoted version
206+
// is currently processing a task.
207+
// 2. There are delays in the visibility task processor (which is asynchronous).
208+
// 3. There's propagation delay of the versioning data between matching nodes.
209+
ReachabilityQuerySetDurationSinceDefault = "frontend.reachabilityQuerySetDurationSinceDefault"
201210
// TaskQueuesPerBuildIdLimit limits the number of task queue names that can be mapped to a single build id.
202211
TaskQueuesPerBuildIdLimit = "limit.taskQueuesPerBuildId"
203212
// RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its containing
204213
// set for it to be considered for removal, used by the build id scavenger.
214+
// This setting allows some propogation delay of versioning data, which may happen for the following reasons:
215+
// 1. There are no workflows currently marked as open in the visibility store but a worker for the demoted version
216+
// is currently processing a task.
217+
// 2. There are delays in the visibility task processor (which is asynchronous).
218+
// 3. There's propagation delay of the versioning data between matching nodes.
205219
RemovableBuildIdDurationSinceDefault = "worker.removableBuildIdDurationSinceDefault"
206220

207221
// keys for frontend

common/util/util.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,12 @@ func FilterSlice[T any](in []T, predicate func(T) bool) []T {
143143
}
144144
return out
145145
}
146+
147+
// ReduceSlice reduces a slice using given reducer function and initial value.
148+
func ReduceSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A {
149+
acc := initializer
150+
for _, val := range in {
151+
acc = reducer(acc, val)
152+
}
153+
return acc
154+
}

common/worker_versioning/worker_versioning.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import (
3030

3131
"github.com/xwb1989/sqlparser"
3232
commonpb "go.temporal.io/api/common/v1"
33-
taskqueuepb "go.temporal.io/api/taskqueue/v1"
3433

34+
persistencespb "go.temporal.io/server/api/persistence/v1"
3535
"go.temporal.io/server/common/namespace"
3636
"go.temporal.io/server/common/persistence/visibility/manager"
3737
"go.temporal.io/server/common/searchattribute"
@@ -66,17 +66,18 @@ func VersionStampToBuildIdSearchAttribute(stamp *commonpb.WorkerVersionStamp) st
6666
return UnversionedBuildIdSearchAttribute(stamp.BuildId)
6767
}
6868

69-
func FindBuildId(versionSets []*taskqueuepb.CompatibleVersionSet, buildId string) (setIndex, indexInSet int) {
69+
// FindBuildId finds a build id in the version data's sets, returning (set index, index within that set).
70+
// Returns -1, -1 if not found.
71+
func FindBuildId(versioningData *persistencespb.VersioningData, buildId string) (setIndex, indexInSet int) {
72+
versionSets := versioningData.GetVersionSets()
7073
setIndex = -1
7174
indexInSet = -1
72-
if len(versionSets) > 0 {
73-
for sidx, set := range versionSets {
74-
for bidx, id := range set.BuildIds {
75-
if buildId == id {
76-
setIndex = sidx
77-
indexInSet = bidx
78-
break
79-
}
75+
for sidx, set := range versionSets {
76+
for bidx, id := range set.GetBuildIds() {
77+
if buildId == id.Id {
78+
setIndex = sidx
79+
indexInSet = bidx
80+
break
8081
}
8182
}
8283
}

service/frontend/service.go

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ type Config struct {
8888
WorkerBuildIdSizeLimit dynamicconfig.IntPropertyFn
8989
ReachabilityTaskQueueScanLimit dynamicconfig.IntPropertyFn
9090
ReachabilityQueryBuildIdLimit dynamicconfig.IntPropertyFn
91+
ReachabilityQuerySetDurationSinceDefault dynamicconfig.DurationPropertyFn
9192
DisallowQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter
9293
ShutdownDrainDuration dynamicconfig.DurationPropertyFn
9394
ShutdownFailHealthCheckDuration dynamicconfig.DurationPropertyFn
@@ -220,35 +221,36 @@ func NewConfig(
220221
InternalFEGlobalNamespaceVisibilityRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.InternalFrontendGlobalNamespaceVisibilityRPS, 0),
221222
// Overshoot since these low rate limits don't work well in an uncoordinated global limiter.
222223
GlobalNamespaceNamespaceReplicationInducingAPIsRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS, 10),
223-
MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000),
224-
WorkerBuildIdSizeLimit: dc.GetIntProperty(dynamicconfig.WorkerBuildIdSizeLimit, 255),
225-
ReachabilityTaskQueueScanLimit: dc.GetIntProperty(dynamicconfig.ReachabilityTaskQueueScanLimit, 20),
226-
ReachabilityQueryBuildIdLimit: dc.GetIntProperty(dynamicconfig.ReachabilityQueryBuildIdLimit, 5),
227-
MaxBadBinaries: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxBadBinaries, namespace.MaxBadBinaries),
228-
DisableListVisibilityByFilter: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.DisableListVisibilityByFilter, false),
229-
BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
230-
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 256*1024),
231-
ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.FrontendThrottledLogRPS, 20),
232-
ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.FrontendShutdownDrainDuration, 0*time.Second),
233-
ShutdownFailHealthCheckDuration: dc.GetDurationProperty(dynamicconfig.FrontendShutdownFailHealthCheckDuration, 0*time.Second),
234-
EnableNamespaceNotActiveAutoForwarding: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableNamespaceNotActiveAutoForwarding, true),
235-
SearchAttributesNumberOfKeysLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesNumberOfKeysLimit, 100),
236-
SearchAttributesSizeOfValueLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesSizeOfValueLimit, 2*1024),
237-
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
238-
VisibilityArchivalQueryMaxPageSize: dc.GetIntProperty(dynamicconfig.VisibilityArchivalQueryMaxPageSize, 10000),
239-
DisallowQuery: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.DisallowQuery, false),
240-
SendRawWorkflowHistory: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.SendRawWorkflowHistory, false),
241-
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
242-
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout),
243-
EnableServerVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableServerVersionCheck, os.Getenv("TEMPORAL_VERSION_CHECK_DISABLED") == ""),
244-
EnableTokenNamespaceEnforcement: dc.GetBoolProperty(dynamicconfig.EnableTokenNamespaceEnforcement, true),
245-
KeepAliveMinTime: dc.GetDurationProperty(dynamicconfig.KeepAliveMinTime, 10*time.Second),
246-
KeepAlivePermitWithoutStream: dc.GetBoolProperty(dynamicconfig.KeepAlivePermitWithoutStream, true),
247-
KeepAliveMaxConnectionIdle: dc.GetDurationProperty(dynamicconfig.KeepAliveMaxConnectionIdle, 2*time.Minute),
248-
KeepAliveMaxConnectionAge: dc.GetDurationProperty(dynamicconfig.KeepAliveMaxConnectionAge, 5*time.Minute),
249-
KeepAliveMaxConnectionAgeGrace: dc.GetDurationProperty(dynamicconfig.KeepAliveMaxConnectionAgeGrace, 70*time.Second),
250-
KeepAliveTime: dc.GetDurationProperty(dynamicconfig.KeepAliveTime, 1*time.Minute),
251-
KeepAliveTimeout: dc.GetDurationProperty(dynamicconfig.KeepAliveTimeout, 10*time.Second),
224+
MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000),
225+
WorkerBuildIdSizeLimit: dc.GetIntProperty(dynamicconfig.WorkerBuildIdSizeLimit, 255),
226+
ReachabilityTaskQueueScanLimit: dc.GetIntProperty(dynamicconfig.ReachabilityTaskQueueScanLimit, 20),
227+
ReachabilityQueryBuildIdLimit: dc.GetIntProperty(dynamicconfig.ReachabilityQueryBuildIdLimit, 5),
228+
ReachabilityQuerySetDurationSinceDefault: dc.GetDurationProperty(dynamicconfig.ReachabilityQuerySetDurationSinceDefault, 5*time.Minute),
229+
MaxBadBinaries: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxBadBinaries, namespace.MaxBadBinaries),
230+
DisableListVisibilityByFilter: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.DisableListVisibilityByFilter, false),
231+
BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
232+
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 256*1024),
233+
ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.FrontendThrottledLogRPS, 20),
234+
ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.FrontendShutdownDrainDuration, 0*time.Second),
235+
ShutdownFailHealthCheckDuration: dc.GetDurationProperty(dynamicconfig.FrontendShutdownFailHealthCheckDuration, 0*time.Second),
236+
EnableNamespaceNotActiveAutoForwarding: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableNamespaceNotActiveAutoForwarding, true),
237+
SearchAttributesNumberOfKeysLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesNumberOfKeysLimit, 100),
238+
SearchAttributesSizeOfValueLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesSizeOfValueLimit, 2*1024),
239+
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
240+
VisibilityArchivalQueryMaxPageSize: dc.GetIntProperty(dynamicconfig.VisibilityArchivalQueryMaxPageSize, 10000),
241+
DisallowQuery: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.DisallowQuery, false),
242+
SendRawWorkflowHistory: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.SendRawWorkflowHistory, false),
243+
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
244+
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout),
245+
EnableServerVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableServerVersionCheck, os.Getenv("TEMPORAL_VERSION_CHECK_DISABLED") == ""),
246+
EnableTokenNamespaceEnforcement: dc.GetBoolProperty(dynamicconfig.EnableTokenNamespaceEnforcement, true),
247+
KeepAliveMinTime: dc.GetDurationProperty(dynamicconfig.KeepAliveMinTime, 10*time.Second),
248+
KeepAlivePermitWithoutStream: dc.GetBoolProperty(dynamicconfig.KeepAlivePermitWithoutStream, true),
249+
KeepAliveMaxConnectionIdle: dc.GetDurationProperty(dynamicconfig.KeepAliveMaxConnectionIdle, 2*time.Minute),
250+
KeepAliveMaxConnectionAge: dc.GetDurationProperty(dynamicconfig.KeepAliveMaxConnectionAge, 5*time.Minute),
251+
KeepAliveMaxConnectionAgeGrace: dc.GetDurationProperty(dynamicconfig.KeepAliveMaxConnectionAgeGrace, 70*time.Second),
252+
KeepAliveTime: dc.GetDurationProperty(dynamicconfig.KeepAliveTime, 1*time.Minute),
253+
KeepAliveTimeout: dc.GetDurationProperty(dynamicconfig.KeepAliveTimeout, 10*time.Second),
252254

253255
DeleteNamespaceDeleteActivityRPS: dc.GetIntProperty(dynamicconfig.DeleteNamespaceDeleteActivityRPS, 100),
254256
DeleteNamespacePageSize: dc.GetIntProperty(dynamicconfig.DeleteNamespacePageSize, 1000),

0 commit comments

Comments
 (0)