Skip to content

Commit 72cd2fd

Browse files
authored
use (runID, child initiated event id & version) as unique requestID to start child workflow (#7813)
## What changed? use (runID, child initiated event id & version) as unique requestID to start child workflow. ## Why? For event-based replication, request_id is not replicated to standby clusters. Standby cannot dedup the start workflow request if failover happens. By using (runID, child initiated event id & version) as requestID, it will be unique across clusters. ## How did you test it? - [x] built - [x] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) ## Potential risks
1 parent 808e144 commit 72cd2fd

11 files changed

+60
-47
lines changed

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,9 +1093,8 @@ func (handler *workflowTaskCompletedHandler) handleCommandStartChildWorkflow(
10931093

10941094
enums.SetDefaultWorkflowIdReusePolicy(&attr.WorkflowIdReusePolicy)
10951095

1096-
requestID := uuid.New()
10971096
event, _, err := handler.mutableState.AddStartChildWorkflowExecutionInitiatedEvent(
1098-
handler.workflowTaskCompletedID, requestID, attr, targetNamespaceID,
1097+
handler.workflowTaskCompletedID, attr, targetNamespaceID,
10991098
)
11001099
if err == nil {
11011100
// Keep track of all child initiated commands in this workflow task to validate request cancel commands

service/history/history_engine2_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,7 +2193,7 @@ func (s *engine2Suite) TestRecordChildExecutionCompleted() {
21932193
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
21942194
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
21952195

2196-
initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
2196+
initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(),
21972197
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)
21982198
request.ParentInitiatedId = initiatedEvent.GetEventId()
21992199
request.ParentInitiatedVersion = initiatedEvent.GetVersion()
@@ -2278,7 +2278,7 @@ func (s *engine2Suite) TestRecordChildExecutionCompleted_ChildFirstRunId() {
22782278
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
22792279
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
22802280

2281-
initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
2281+
initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(),
22822282
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)
22832283
request.ParentInitiatedId = initiatedEvent.GetEventId()
22842284
request.ParentInitiatedVersion = initiatedEvent.GetVersion()
@@ -2344,7 +2344,7 @@ func (s *engine2Suite) TestRecordChildExecutionCompleted_MissingChildStartedEven
23442344
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
23452345
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
23462346

2347-
initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
2347+
initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(),
23482348
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)
23492349
request.ParentInitiatedId = initiatedEvent.GetEventId()
23502350
request.ParentInitiatedVersion = initiatedEvent.GetVersion()
@@ -2421,7 +2421,7 @@ func (s *engine2Suite) TestRecordChildExecutionCompleted_MissingChildStartedEven
24212421
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
24222422
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
24232423

2424-
initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
2424+
initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(),
24252425
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)
24262426
request.ParentInitiatedId = initiatedEvent.GetEventId()
24272427
request.ParentInitiatedVersion = initiatedEvent.GetVersion()
@@ -2683,7 +2683,7 @@ func (s *engine2Suite) TestVerifyChildExecutionCompletionRecorded_InitiatedEvent
26832683
workflowTasksStartEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskQueueName, uuid.New())
26842684
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
26852685
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
2686-
initiatedEvent, ci := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
2686+
initiatedEvent, ci := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(),
26872687
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)
26882688

26892689
request := &historyservice.VerifyChildExecutionCompletionRecordedRequest{

service/history/history_engine_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6611,7 +6611,6 @@ func addSignaledEvent(
66116611
func addStartChildWorkflowExecutionInitiatedEvent(
66126612
ms historyi.MutableState,
66136613
workflowTaskCompletedID int64,
6614-
createRequestID string,
66156614
namespace namespace.Name,
66166615
namespaceID namespace.ID,
66176616
workflowID, workflowType, taskQueue string,
@@ -6620,7 +6619,7 @@ func addStartChildWorkflowExecutionInitiatedEvent(
66206619
parentClosePolicy enumspb.ParentClosePolicy,
66216620
) (*historypb.HistoryEvent, *persistencespb.ChildExecutionInfo) {
66226621

6623-
event, cei, _ := ms.AddStartChildWorkflowExecutionInitiatedEvent(workflowTaskCompletedID, createRequestID,
6622+
event, cei, _ := ms.AddStartChildWorkflowExecutionInitiatedEvent(workflowTaskCompletedID,
66246623
&commandpb.StartChildWorkflowExecutionCommandAttributes{
66256624
Namespace: namespace.String(),
66266625
WorkflowId: workflowID,

service/history/interfaces/mutable_state.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type (
8282
AddSignalExternalWorkflowExecutionInitiatedEvent(int64, string, *commandpb.SignalExternalWorkflowExecutionCommandAttributes, namespace.ID) (*historypb.HistoryEvent, *persistencespb.SignalInfo, error)
8383
AddSignalRequested(requestID string)
8484
AddStartChildWorkflowExecutionFailedEvent(int64, enumspb.StartChildWorkflowExecutionFailedCause, *historypb.StartChildWorkflowExecutionInitiatedEventAttributes) (*historypb.HistoryEvent, error)
85-
AddStartChildWorkflowExecutionInitiatedEvent(int64, string, *commandpb.StartChildWorkflowExecutionCommandAttributes, namespace.ID) (*historypb.HistoryEvent, *persistencespb.ChildExecutionInfo, error)
85+
AddStartChildWorkflowExecutionInitiatedEvent(int64, *commandpb.StartChildWorkflowExecutionCommandAttributes, namespace.ID) (*historypb.HistoryEvent, *persistencespb.ChildExecutionInfo, error)
8686
AddTimeoutWorkflowEvent(int64, enumspb.RetryState, string) (*historypb.HistoryEvent, error)
8787
AddTimerCanceledEvent(int64, *commandpb.CancelTimerCommandAttributes, string) (*historypb.HistoryEvent, error)
8888
AddTimerFiredEvent(string) (*historypb.HistoryEvent, error)
@@ -241,7 +241,7 @@ type (
241241
ApplySignalExternalWorkflowExecutionFailedEvent(*historypb.HistoryEvent) error
242242
ApplySignalExternalWorkflowExecutionInitiatedEvent(int64, *historypb.HistoryEvent, string) (*persistencespb.SignalInfo, error)
243243
ApplyStartChildWorkflowExecutionFailedEvent(*historypb.HistoryEvent) error
244-
ApplyStartChildWorkflowExecutionInitiatedEvent(int64, *historypb.HistoryEvent, string) (*persistencespb.ChildExecutionInfo, error)
244+
ApplyStartChildWorkflowExecutionInitiatedEvent(int64, *historypb.HistoryEvent) (*persistencespb.ChildExecutionInfo, error)
245245
ApplyTimerCanceledEvent(*historypb.HistoryEvent) error
246246
ApplyTimerFiredEvent(*historypb.HistoryEvent) error
247247
ApplyTimerStartedEvent(*historypb.HistoryEvent) (*persistencespb.TimerInfo, error)

service/history/interfaces/mutable_state_mock.go

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/transfer_queue_active_task_executor_test.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
845845
},
846846
}, defaultWorkflowTaskCompletionLimits)
847847

848-
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
848+
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
849849
Namespace: "child namespace1",
850850
WorkflowId: "child workflow1",
851851
WorkflowType: &commonpb.WorkflowType{
@@ -856,7 +856,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
856856
ParentClosePolicy: parentClosePolicy1,
857857
}, "child namespace1-ID")
858858
s.Nil(err)
859-
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
859+
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
860860
Namespace: "child namespace2",
861861
WorkflowId: "child workflow2",
862862
WorkflowType: &commonpb.WorkflowType{
@@ -867,7 +867,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
867867
ParentClosePolicy: parentClosePolicy2,
868868
}, "child namespace2-ID")
869869
s.Nil(err)
870-
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
870+
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
871871
Namespace: "child namespace3",
872872
WorkflowId: "child workflow3",
873873
WorkflowType: &commonpb.WorkflowType{
@@ -970,7 +970,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
970970
}, defaultWorkflowTaskCompletionLimits)
971971

972972
for i := 0; i < 10; i++ {
973-
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
973+
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
974974
WorkflowId: "child workflow" + convert.IntToString(i),
975975
WorkflowType: &commonpb.WorkflowType{
976976
Name: "child workflow type",
@@ -1067,7 +1067,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_ParentW
10671067
}, defaultWorkflowTaskCompletionLimits)
10681068

10691069
for i := 0; i < 10; i++ {
1070-
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
1070+
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
10711071
WorkflowId: "child workflow" + convert.IntToString(i),
10721072
WorkflowType: &commonpb.WorkflowType{
10731073
Name: "child workflow type",
@@ -1157,7 +1157,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
11571157
}, defaultWorkflowTaskCompletionLimits)
11581158

11591159
for i := 0; i < 10; i++ {
1160-
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
1160+
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
11611161
WorkflowId: "child workflow" + convert.IntToString(i),
11621162
WorkflowType: &commonpb.WorkflowType{
11631163
Name: "child workflow type",
@@ -1254,7 +1254,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
12541254
},
12551255
}, defaultWorkflowTaskCompletionLimits)
12561256

1257-
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
1257+
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
12581258
Namespace: "child namespace1",
12591259
WorkflowId: "child workflow1",
12601260
WorkflowType: &commonpb.WorkflowType{
@@ -1266,7 +1266,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
12661266
}, "child namespace1-ID")
12671267
s.NoError(err)
12681268

1269-
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), uuid.New(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
1269+
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(event.GetEventId(), &commandpb.StartChildWorkflowExecutionCommandAttributes{
12701270
Namespace: "child namespace1",
12711271
WorkflowId: "child workflow2",
12721272
WorkflowType: &commonpb.WorkflowType{
@@ -1938,7 +1938,6 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su
19381938
event, ci := addStartChildWorkflowExecutionInitiatedEvent(
19391939
mutableState,
19401940
event.GetEventId(),
1941-
uuid.New(),
19421941
s.childNamespace,
19431942
s.childNamespaceID,
19441943
childWorkflowID,
@@ -2053,7 +2052,6 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Re
20532052
childInitEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(
20542053
mutableState,
20552054
1111,
2056-
uuid.New(),
20572055
s.childNamespace,
20582056
s.childNamespaceID,
20592057
childWorkflowID,
@@ -2177,7 +2175,6 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Fa
21772175
event, ci := addStartChildWorkflowExecutionInitiatedEvent(
21782176
mutableState,
21792177
event.GetEventId(),
2180-
uuid.New(),
21812178
s.childNamespace,
21822179
s.childNamespaceID,
21832180
childWorkflowID,
@@ -2267,7 +2264,6 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Fa
22672264
event, _ = addStartChildWorkflowExecutionInitiatedEvent(
22682265
mutableState,
22692266
event.GetEventId(),
2270-
uuid.New(),
22712267
s.namespace,
22722268
s.namespaceID,
22732269
childWorkflowID,
@@ -2342,7 +2338,6 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su
23422338
event, ci := addStartChildWorkflowExecutionInitiatedEvent(
23432339
mutableState,
23442340
event.GetEventId(),
2345-
uuid.New(),
23462341
s.childNamespace,
23472342
s.childNamespaceID,
23482343
childWorkflowID,
@@ -2446,7 +2441,6 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Du
24462441
event, ci := addStartChildWorkflowExecutionInitiatedEvent(
24472442
mutableState,
24482443
event.GetEventId(),
2449-
uuid.New(),
24502444
s.childNamespace,
24512445
s.childNamespaceID,
24522446
childExecution.GetWorkflowId(),
@@ -2530,7 +2524,6 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessorStartChildExecution_
25302524
event, ci := addStartChildWorkflowExecutionInitiatedEvent(
25312525
mutableState,
25322526
event.GetEventId(),
2533-
uuid.New(),
25342527
s.childNamespace,
25352528
s.childNamespaceID,
25362529
childExecution.GetWorkflowId(),

service/history/transfer_queue_standby_task_executor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -954,7 +954,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P
954954
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
955955

956956
taskID := s.mustGenerateTaskID()
957-
event, _ = addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
957+
event, _ = addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(),
958958
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_ABANDON)
959959

960960
now := time.Now().UTC()
@@ -1064,7 +1064,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_S
10641064
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
10651065

10661066
taskID := s.mustGenerateTaskID()
1067-
event, childInfo := addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(),
1067+
event, childInfo := addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(),
10681068
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_ABANDON)
10691069

10701070
now := time.Now().UTC()

0 commit comments

Comments
 (0)