Skip to content

Commit 4149888

Browse files
authored
Dynamic config to enable generating request id reference links (temporalio#7712)
## What changed? <!-- Describe what has changed in this PR --> Dynamic config to enable generating request id reference links ## Why? <!-- Tell your future self why have you made these changes --> ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
1 parent faebf1d commit 4149888

File tree

6 files changed

+33
-12
lines changed

6 files changed

+33
-12
lines changed

common/dynamicconfig/constants.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2118,7 +2118,7 @@ the user has not specified an explicit RetryPolicy`,
21182118
`DefaultWorkflowRetryPolicy represents the out-of-box retry policy for unset fields
21192119
where the user has set an explicit RetryPolicy, but not specified all the fields`,
21202120
)
2121-
FollowReusePolicyAfterConflictPolicyTerminate = NewNamespaceTypedSetting(
2121+
FollowReusePolicyAfterConflictPolicyTerminate = NewNamespaceBoolSetting(
21222122
"history.followReusePolicyAfterConflictPolicyTerminate",
21232123
true,
21242124
`Follows WorkflowIdReusePolicy RejectDuplicate and AllowDuplicateFailedOnly after WorkflowIdReusePolicy TerminateExisting was applied.
@@ -2429,6 +2429,13 @@ that task will be sent to DLQ.`,
24292429
`SendRawHistoryBetweenInternalServices is whether to send raw history events between internal temporal services`,
24302430
)
24312431

2432+
// TODO(rodrigozhou): This is temporary dynamic config to be removed before the next release.
2433+
EnableRequestIdRefLinks = NewGlobalBoolSetting(
2434+
"history.enableRequestIdRefLinks",
2435+
false,
2436+
"Enable generating request ID reference links",
2437+
)
2438+
24322439
// keys for worker
24332440

24342441
WorkerPersistenceMaxQPS = NewGlobalIntSetting(

service/history/api/startworkflow/api.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ type Starter struct {
5858
request *historyservice.StartWorkflowExecutionRequest
5959
namespace *namespace.Namespace
6060
createOrUpdateLeaseFn api.CreateOrUpdateLeaseFunc
61-
followReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]
61+
followReusePolicyAfterConflictPolicyTerminate dynamicconfig.BoolPropertyFnWithNamespaceFilter
62+
enableRequestIdRefLinks dynamicconfig.BoolPropertyFn
6263
}
6364

6465
// creationParams is a container for all information obtained from creating the uncommitted execution.
@@ -105,6 +106,7 @@ func NewStarter(
105106
namespace: namespaceEntry,
106107
createOrUpdateLeaseFn: createLeaseFn,
107108
followReusePolicyAfterConflictPolicyTerminate: shardContext.GetConfig().FollowReusePolicyAfterConflictPolicyTerminate,
109+
enableRequestIdRefLinks: shardContext.GetConfig().EnableRequestIdRefLinks,
108110
}, nil
109111
}
110112

@@ -807,6 +809,9 @@ func (s *Starter) generateStartedEventRefLink(runID string) *commonpb.Link {
807809
}
808810

809811
func (s *Starter) generateRequestIdRefLink(runID string) *commonpb.Link {
812+
if !s.enableRequestIdRefLinks() {
813+
return s.generateStartedEventRefLink(runID)
814+
}
810815
return &commonpb.Link{
811816
Variant: &commonpb.Link_WorkflowEvent_{
812817
WorkflowEvent: &commonpb.Link_WorkflowEvent{

service/history/configs/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type Config struct {
6363
EnableUpdateWorkflowModeIgnoreCurrent dynamicconfig.BoolPropertyFn
6464
EnableTransitionHistory dynamicconfig.BoolPropertyFn
6565
MaxCallbacksPerWorkflow dynamicconfig.IntPropertyFnWithNamespaceFilter
66+
EnableRequestIdRefLinks dynamicconfig.BoolPropertyFn
6667

6768
// EventsCache settings
6869
// Change of these configs require shard restart
@@ -328,7 +329,7 @@ type Config struct {
328329
EnableEagerWorkflowStart dynamicconfig.BoolPropertyFnWithNamespaceFilter
329330
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn
330331

331-
FollowReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]
332+
FollowReusePolicyAfterConflictPolicyTerminate dynamicconfig.BoolPropertyFnWithNamespaceFilter
332333

333334
// ArchivalQueueProcessor settings
334335
ArchivalProcessorSchedulerWorkerCount dynamicconfig.TypedSubscribable[int]
@@ -418,6 +419,7 @@ func NewConfig(
418419
EnableUpdateWorkflowModeIgnoreCurrent: dynamicconfig.EnableUpdateWorkflowModeIgnoreCurrent.Get(dc),
419420
EnableTransitionHistory: dynamicconfig.EnableTransitionHistory.Get(dc),
420421
MaxCallbacksPerWorkflow: dynamicconfig.MaxCallbacksPerWorkflow.Get(dc),
422+
EnableRequestIdRefLinks: dynamicconfig.EnableRequestIdRefLinks.Get(dc),
421423

422424
EventsShardLevelCacheMaxSizeBytes: dynamicconfig.EventsCacheMaxSizeBytes.Get(dc), // 512KB
423425
EventsHostLevelCacheMaxSizeBytes: dynamicconfig.EventsHostLevelCacheMaxSizeBytes.Get(dc), // 256MB

service/history/workflow/mutable_state_impl.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -594,14 +594,16 @@ func (ms *MutableStateImpl) GetNexusCompletion(
594594
},
595595
},
596596
}
597-
requestIDInfo := ms.executionState.RequestIds[requestID]
598-
if requestIDInfo.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED {
599-
// If the callback was attached, then replace with RequestIdReference.
600-
link.Reference = &commonpb.Link_WorkflowEvent_RequestIdRef{
601-
RequestIdRef: &commonpb.Link_WorkflowEvent_RequestIdReference{
602-
RequestId: requestID,
603-
EventType: requestIDInfo.GetEventType(),
604-
},
597+
if ms.config.EnableRequestIdRefLinks() {
598+
requestIDInfo := ms.executionState.RequestIds[requestID]
599+
if requestIDInfo.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED {
600+
// If the callback was attached, then replace with RequestIdReference.
601+
link.Reference = &commonpb.Link_WorkflowEvent_RequestIdRef{
602+
RequestIdRef: &commonpb.Link_WorkflowEvent_RequestIdReference{
603+
RequestId: requestID,
604+
EventType: requestIDInfo.GetEventType(),
605+
},
606+
}
605607
}
606608
}
607609
startLink := nexusoperations.ConvertLinkWorkflowEventToNexusLink(link)

tests/nexus_workflow_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,8 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletion() {
758758
}
759759

760760
func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionBeforeStart() {
761+
s.OverrideDynamicConfig(dynamicconfig.EnableRequestIdRefLinks, true)
762+
761763
ctx := testcore.NewContext()
762764
taskQueues := []string{testcore.RandomizeStr(s.T().Name()), testcore.RandomizeStr(s.T().Name())}
763765
wfRuns := []client.WorkflowRun{}
@@ -2231,7 +2233,7 @@ func (s *NexusWorkflowTestSuite) TestNexusCallbackAfterCallerComplete() {
22312233
})
22322234
require.NoError(ct, err)
22332235
require.Len(ct, resp.Callbacks, 1)
2234-
require.Equal(ct, resp.Callbacks[0].State, enumspb.CALLBACK_STATE_FAILED)
2236+
require.Equal(ct, enumspb.CALLBACK_STATE_FAILED, resp.Callbacks[0].State)
22352237
require.NotNil(ct, resp.Callbacks[0].LastAttemptFailure)
22362238
require.Equal(ct, resp.Callbacks[0].LastAttemptFailure.Message, "handler error (NOT_FOUND): workflow execution already completed")
22372239
}, 3*time.Second, 200*time.Millisecond)

tests/workflow_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ func (s *WorkflowTestSuite) TestStartWorkflowExecution_UseExisting() {
198198
}
199199

200200
func (s *WorkflowTestSuite) TestStartWorkflowExecution_UseExisting_OnConflictOptions() {
201+
s.OverrideDynamicConfig(dynamicconfig.EnableRequestIdRefLinks, true)
201202
s.OverrideDynamicConfig(
202203
callbacks.AllowedAddresses,
203204
[]any{
@@ -440,6 +441,7 @@ func (s *WorkflowTestSuite) TestStartWorkflowExecution_UseExisting_OnConflictOpt
440441
}
441442

442443
func (s *WorkflowTestSuite) TestStartWorkflowExecution_UseExisting_OnConflictOptions_Dedup() {
444+
s.OverrideDynamicConfig(dynamicconfig.EnableRequestIdRefLinks, true)
443445
tv := testvars.New(s.T())
444446
request := &workflowservice.StartWorkflowExecutionRequest{
445447
RequestId: tv.RequestID(),
@@ -570,6 +572,7 @@ func (s *WorkflowTestSuite) TestStartWorkflowExecution_UseExisting_OnConflictOpt
570572
}
571573

572574
func (s *WorkflowTestSuite) TestStartWorkflowExecution_UseExisting_OnConflictOptions_NoDedup() {
575+
s.OverrideDynamicConfig(dynamicconfig.EnableRequestIdRefLinks, true)
573576
tv := testvars.New(s.T())
574577
request := &workflowservice.StartWorkflowExecutionRequest{
575578
RequestId: uuid.New(),

0 commit comments

Comments
 (0)