Skip to content

Commit 0ae59f6

Browse files
authored
Adds automatic population of unset retrypolicy field when starting Workflows (#654)
Change summary: Adds a separate dynamic configuration to dictate "default" retry policy field values for Workflows vs. Activities Adds logic to auto-populate unset retry policy fields for starting a workflow, signaling-to-starting a workflow, and starting a child workflow. If the specified retry policy is nil, no auto-population happens. If it is non-nil, unset fields are assigned defaults Fixes bug where WorkflowExpirationTime was always set to 0 for Child Workflows. This meant that Child Workflows would assume that max attempts = 0 implies no retry policy is set. Addresses flakiness in matching test (#243)
1 parent d6b4e74 commit 0ae59f6

22 files changed

+265
-141
lines changed

common/defaultActivityRetrySettings.go

Lines changed: 0 additions & 10 deletions
This file was deleted.

common/defaultRetrySettings.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package common
2+
3+
import "time"
4+
5+
// DefaultRetrySettings indicates what the "default" retry settings
6+
// are if it is not specified on an Activity or for any unset fields
7+
// if a policy is explicitly set on a workflow
8+
type DefaultRetrySettings struct {
9+
InitialInterval time.Duration
10+
MaximumIntervalCoefficient float64
11+
BackoffCoefficient float64
12+
MaximumAttempts int32
13+
}

common/service/dynamicconfig/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ type MapPropertyFn func(opts ...FilterOption) map[string]interface{}
128128
// StringPropertyFnWithNamespaceFilter is a wrapper to get string property from dynamic config
129129
type StringPropertyFnWithNamespaceFilter func(namespace string) string
130130

131+
// MapPropertyFnWithNamespaceFilter is a wrapper to get map property from dynamic config
132+
type MapPropertyFnWithNamespaceFilter func(namespace string) map[string]interface{}
133+
131134
// BoolPropertyFnWithNamespaceFilter is a wrapper to get bool property from dynamic config
132135
type BoolPropertyFnWithNamespaceFilter func(namespace string) bool
133136

@@ -358,6 +361,18 @@ func (c *Collection) GetStringPropertyFnWithNamespaceFilter(key Key, defaultValu
358361
}
359362
}
360363

364+
// GetMapPropertyFnWithNamespaceFilter gets property and asserts that it's a map
365+
func (c *Collection) GetMapPropertyFnWithNamespaceFilter(key Key, defaultValue map[string]interface{}) MapPropertyFnWithNamespaceFilter {
366+
return func(namespace string) map[string]interface{} {
367+
val, err := c.client.GetMapValue(key, getFilterMap(NamespaceFilter(namespace)), defaultValue)
368+
if err != nil {
369+
c.logError(key, err)
370+
}
371+
c.logValue(key, val, defaultValue, reflect.DeepEqual)
372+
return val
373+
}
374+
}
375+
361376
// GetBoolPropertyFnWithNamespaceFilter gets property with namespace filter and asserts that its namespace
362377
func (c *Collection) GetBoolPropertyFnWithNamespaceFilter(key Key, defaultValue bool) BoolPropertyFnWithNamespaceFilter {
363378
return func(namespace string) bool {

common/service/dynamicconfig/config_mock.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,8 @@ func GetStringPropertyFn(value string) func(opts ...FilterOption) string {
8686
func GetMapPropertyFn(value map[string]interface{}) func(opts ...FilterOption) map[string]interface{} {
8787
return func(...FilterOption) map[string]interface{} { return value }
8888
}
89+
90+
// GetMapPropertyFnWithNamespaceFilter returns value as MapPropertyFn
91+
func GetMapPropertyFnWithNamespaceFilter(value map[string]interface{}) func(namespace string) map[string]interface{} {
92+
return func(namespace string) map[string]interface{} { return value }
93+
}

common/service/dynamicconfig/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ var keys = map[Key]string{
258258
EnableDropStuckTaskByNamespaceID: "history.DropStuckTaskByNamespace",
259259
SkipReapplicationByNamespaceId: "history.SkipReapplicationByNamespaceId",
260260
DefaultActivityRetryPolicy: "history.defaultActivityRetryPolicy",
261+
DefaultWorkflowRetryPolicy: "history.defaultWorkflowRetryPolicy",
261262

262263
WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
263264
WorkerPersistenceGlobalMaxQPS: "worker.persistenceGlobalMaxQPS",
@@ -635,6 +636,9 @@ const (
635636
// DefaultActivityRetryPolicy represents the out-of-box retry policy for activities where
636637
// the user has not specified an explicit RetryPolicy
637638
DefaultActivityRetryPolicy
639+
// DefaultWorkflowRetryPolicy represents the out-of-box retry policy for unset fields
640+
// where the user has set an explicit RetryPolicy, but not specified all the fields
641+
DefaultWorkflowRetryPolicy
638642

639643
// EnableAdminProtection is whether to enable admin checking
640644
EnableAdminProtection

common/util.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
historypb "go.temporal.io/api/history/v1"
4141
"go.temporal.io/api/serviceerror"
4242
"go.temporal.io/api/workflowservice/v1"
43+
workflowspb "go.temporal.io/server/api/workflow/v1"
4344

4445
"go.temporal.io/server/api/historyservice/v1"
4546
"go.temporal.io/server/api/matchingservice/v1"
@@ -79,6 +80,16 @@ const (
7980
retryKafkaOperationMaxInterval = 10 * time.Second
8081
retryKafkaOperationExpirationInterval = 30 * time.Second
8182

83+
defaultInitialInterval = time.Second
84+
defaultMaximumIntervalCoefficient = 100.0
85+
defaultBackoffCoefficient = 2.0
86+
defaultMaximumAttempts = 0
87+
88+
initialIntervalInSecondsConfigKey = "InitialIntervalInSeconds"
89+
maximumIntervalCoefficientConfigKey = "MaximumIntervalCoefficient"
90+
backoffCoefficientConfigKey = "BackoffCoefficient"
91+
maximumAttemptsConfigKey = "MaximumAttempts"
92+
8293
contextExpireThreshold = 10 * time.Millisecond
8394

8495
// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
@@ -370,13 +381,13 @@ func SortInt64Slice(slice []int64) {
370381
}
371382

372383
// EnsureRetryPolicyDefaults ensures the policy subfields, if not explicitly set, are set to the specified defaults
373-
func EnsureRetryPolicyDefaults(originalPolicy *commonpb.RetryPolicy, defaultSettings DefaultActivityRetrySettings) {
384+
func EnsureRetryPolicyDefaults(originalPolicy *commonpb.RetryPolicy, defaultSettings DefaultRetrySettings) {
374385
if originalPolicy.GetMaximumAttempts() == 0 {
375386
originalPolicy.MaximumAttempts = defaultSettings.MaximumAttempts
376387
}
377388

378389
if timestamp.DurationValue(originalPolicy.GetInitialInterval()) == 0 {
379-
originalPolicy.InitialInterval = timestamp.DurationPtr(time.Duration(defaultSettings.InitialIntervalInSeconds) * time.Second)
390+
originalPolicy.InitialInterval = timestamp.DurationPtr(defaultSettings.InitialInterval)
380391
}
381392

382393
if timestamp.DurationValue(originalPolicy.GetMaximumInterval()) == 0 {
@@ -394,6 +405,7 @@ func ValidateRetryPolicy(policy *commonpb.RetryPolicy) error {
394405
// nil policy is valid which means no retry
395406
return nil
396407
}
408+
397409
if policy.GetMaximumAttempts() == 1 {
398410
// One maximum attempt effectively disable retries. Validating the
399411
// rest of the arguments is pointless
@@ -417,18 +429,61 @@ func ValidateRetryPolicy(policy *commonpb.RetryPolicy) error {
417429
return nil
418430
}
419431

432+
func GetDefaultRetryPolicyConfigOptions() map[string]interface{} {
433+
return map[string]interface{}{
434+
initialIntervalInSecondsConfigKey: int(defaultInitialInterval.Seconds()),
435+
maximumIntervalCoefficientConfigKey: defaultMaximumIntervalCoefficient,
436+
backoffCoefficientConfigKey: defaultBackoffCoefficient,
437+
maximumAttemptsConfigKey: defaultMaximumAttempts,
438+
}
439+
}
440+
441+
func FromConfigToDefaultRetrySettings(options map[string]interface{}) DefaultRetrySettings {
442+
defaultSettings := DefaultRetrySettings{
443+
InitialInterval: defaultInitialInterval,
444+
MaximumIntervalCoefficient: defaultMaximumIntervalCoefficient,
445+
BackoffCoefficient: defaultBackoffCoefficient,
446+
MaximumAttempts: defaultMaximumAttempts,
447+
}
448+
449+
initialIntervalInSeconds, ok := options[initialIntervalInSecondsConfigKey]
450+
if ok {
451+
defaultSettings.InitialInterval = time.Duration(initialIntervalInSeconds.(int)) * time.Second
452+
}
453+
454+
maximumIntervalCoefficient, ok := options[maximumIntervalCoefficientConfigKey]
455+
if ok {
456+
defaultSettings.MaximumIntervalCoefficient = maximumIntervalCoefficient.(float64)
457+
}
458+
459+
backoffCoefficient, ok := options[backoffCoefficientConfigKey]
460+
if ok {
461+
defaultSettings.BackoffCoefficient = backoffCoefficient.(float64)
462+
}
463+
464+
maximumAttempts, ok := options[maximumAttemptsConfigKey]
465+
if ok {
466+
defaultSettings.MaximumAttempts = int32(maximumAttempts.(int))
467+
}
468+
469+
return defaultSettings
470+
}
471+
420472
// CreateHistoryStartWorkflowRequest create a start workflow request for history
421473
func CreateHistoryStartWorkflowRequest(
422474
namespaceID string,
423475
startRequest *workflowservice.StartWorkflowExecutionRequest,
476+
parentExecutionInfo *workflowspb.ParentExecutionInfo,
477+
now time.Time,
424478
) *historyservice.StartWorkflowExecutionRequest {
425-
now := time.Now()
426479
histRequest := &historyservice.StartWorkflowExecutionRequest{
427480
NamespaceId: namespaceID,
428481
StartRequest: startRequest,
429482
ContinueAsNewInitiator: enumspb.CONTINUE_AS_NEW_INITIATOR_WORKFLOW,
430483
Attempt: 1,
484+
ParentExecutionInfo: parentExecutionInfo,
431485
}
486+
432487
if timestamp.DurationValue(startRequest.GetWorkflowExecutionTimeout()) > 0 {
433488
deadline := now.Add(timestamp.DurationValue(startRequest.GetWorkflowExecutionTimeout()))
434489
histRequest.WorkflowExecutionExpirationTime = timestamp.TimePtr(deadline.Round(time.Millisecond))

common/util_test.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ func TestValidateRetryPolicy(t *testing.T) {
9292
}
9393

9494
func TestEnsureRetryPolicyDefaults(t *testing.T) {
95-
defaultActivityRetrySettings := DefaultActivityRetrySettings{
96-
InitialIntervalInSeconds: 1,
95+
defaultRetrySettings := DefaultRetrySettings{
96+
InitialInterval: time.Second,
9797
MaximumIntervalCoefficient: 100,
9898
BackoffCoefficient: 2.0,
9999
MaximumAttempts: 120,
@@ -181,8 +181,23 @@ func TestEnsureRetryPolicyDefaults(t *testing.T) {
181181

182182
for _, tt := range testCases {
183183
t.Run(tt.name, func(t *testing.T) {
184-
EnsureRetryPolicyDefaults(tt.input, defaultActivityRetrySettings)
184+
EnsureRetryPolicyDefaults(tt.input, defaultRetrySettings)
185185
assert.Equal(t, tt.want, tt.input)
186186
})
187187
}
188188
}
189+
190+
func Test_FromConfigToRetryPolicy(t *testing.T) {
191+
options := map[string]interface{}{
192+
initialIntervalInSecondsConfigKey: 2,
193+
maximumIntervalCoefficientConfigKey: 100.0,
194+
backoffCoefficientConfigKey: 4.0,
195+
maximumAttemptsConfigKey: 5,
196+
}
197+
198+
defaultSettings := FromConfigToDefaultRetrySettings(options)
199+
assert.Equal(t, 2*time.Second, defaultSettings.InitialInterval)
200+
assert.Equal(t, 100.0, defaultSettings.MaximumIntervalCoefficient)
201+
assert.Equal(t, 4.0, defaultSettings.BackoffCoefficient)
202+
assert.Equal(t, int32(5), defaultSettings.MaximumAttempts)
203+
}

config/dynamicconfig/development.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,9 @@ history.defaultActivityRetryPolicy:
2828
MaximumIntervalCoefficient: 100.0
2929
BackoffCoefficient: 2.0
3030
MaximumAttempts: 0
31+
history.defaultWorkflowRetryPolicy:
32+
- value:
33+
InitialIntervalInSeconds: 1
34+
MaximumIntervalCoefficient: 100.0
35+
BackoffCoefficient: 2.0
36+
MaximumAttempts: 0

host/integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ func (s *integrationSuite) TestWorkflowRetry() {
615615
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
616616
Identity: identity,
617617
RetryPolicy: &commonpb.RetryPolicy{
618-
InitialInterval: &initialInterval,
618+
// Intentionally test server-initialization of Initial Interval value (which should be 1 second)
619619
MaximumAttempts: int32(maximumAttempts),
620620
MaximumInterval: timestamp.DurationPtr(1 * time.Second),
621621
NonRetryableErrorTypes: []string{"bad-bug"},

service/frontend/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ type Config struct {
9797
SearchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
9898
SearchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
9999

100+
// DefaultWorkflowRetryPolicy represents default values for unset fields on a Workflow's
101+
// specified RetryPolicy
102+
DefaultWorkflowRetryPolicy dynamicconfig.MapPropertyFnWithNamespaceFilter
103+
100104
// VisibilityArchival system protection
101105
VisibilityArchivalQueryMaxPageSize dynamicconfig.IntPropertyFn
102106

@@ -144,6 +148,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
144148
SendRawWorkflowHistory: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.SendRawWorkflowHistory, false),
145149
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false),
146150
EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.FrontendEnableCleanupReplicationTask, true),
151+
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
147152
}
148153
}
149154

0 commit comments

Comments
 (0)