@@ -2,27 +2,32 @@ package verifychildworkflowcompletionrecorded
22
33import (
44 "context"
5+ "errors"
56
7+ commonpb "go.temporal.io/api/common/v1"
8+ "go.temporal.io/api/serviceerror"
9+ "go.temporal.io/server/api/adminservice/v1"
610 enumsspb "go.temporal.io/server/api/enums/v1"
11+ historyspb "go.temporal.io/server/api/history/v1"
712 "go.temporal.io/server/api/historyservice/v1"
13+ persistencespb "go.temporal.io/server/api/persistence/v1"
814 "go.temporal.io/server/common"
915 "go.temporal.io/server/common/definition"
1016 "go.temporal.io/server/common/locks"
1117 "go.temporal.io/server/common/namespace"
18+ "go.temporal.io/server/common/persistence/transitionhistory"
19+ "go.temporal.io/server/common/persistence/versionhistory"
1220 "go.temporal.io/server/service/history/api"
1321 "go.temporal.io/server/service/history/consts"
22+ historyi "go.temporal.io/server/service/history/interfaces"
1423)
1524
16- func Invoke (
25+ func verifyChildExecution (
1726 ctx context.Context ,
18- request * historyservice.VerifyChildExecutionCompletionRecordedRequest ,
1927 workflowConsistencyChecker api.WorkflowConsistencyChecker ,
20- ) (resp * historyservice.VerifyChildExecutionCompletionRecordedResponse , retError error ) {
21- namespaceID := namespace .ID (request .GetNamespaceId ())
22- if err := api .ValidateNamespaceUUID (namespaceID ); err != nil {
23- return nil , err
24- }
25-
28+ request * historyservice.VerifyChildExecutionCompletionRecordedRequest ,
29+ ) (versionedTransition * persistencespb.VersionedTransition ,
30+ versionHistories * historyspb.VersionHistories , retError error ) {
2631 workflowLease , err := workflowConsistencyChecker .GetWorkflowLease (
2732 ctx ,
2833 request .Clock ,
@@ -38,40 +43,123 @@ func Invoke(
3843 locks .PriorityLow ,
3944 )
4045 if err != nil {
41- return nil , err
46+ return nil , nil , err
4247 }
4348 defer func () { workflowLease .GetReleaseFn ()(retError ) }()
4449
4550 mutableState := workflowLease .GetMutableState ()
4651 if ! mutableState .IsWorkflowExecutionRunning () &&
4752 mutableState .GetExecutionState ().State != enumsspb .WORKFLOW_EXECUTION_STATE_ZOMBIE {
4853 // parent has already completed and can't be blocked after failover.
49- return & historyservice. VerifyChildExecutionCompletionRecordedResponse {} , nil
54+ return nil , nil , nil
5055 }
5156
5257 onCurrentBranch , err := api .IsHistoryEventOnCurrentBranch (mutableState , request .ParentInitiatedId , request .ParentInitiatedVersion )
5358 if err != nil {
5459 // initiated event not found on any branch
55- return nil , consts .ErrWorkflowNotReady
60+ return nil , nil , consts .ErrWorkflowNotReady
5661 }
5762
5863 if ! onCurrentBranch {
5964 // due to conflict resolution, the initiated event may on a different branch of the workflow.
6065 // we don't have to do anything and can simply return not found error. Standby logic
6166 // after seeing this error will give up verification.
62- return nil , consts .ErrChildExecutionNotFound
67+ return nil , nil , consts .ErrChildExecutionNotFound
6368 }
6469
6570 ci , isRunning := mutableState .GetChildExecutionInfo (request .ParentInitiatedId )
6671 if isRunning {
6772 if ci .StartedEventId != common .EmptyEventID &&
6873 ci .GetStartedWorkflowId () != request .ChildExecution .GetWorkflowId () {
6974 // this can happen since we may not have the initiated version
70- return nil , consts .ErrChildExecutionNotFound
75+ return nil , nil , consts .ErrChildExecutionNotFound
76+ }
77+
78+ return nil , nil , consts .ErrWorkflowNotReady
79+ }
80+
81+ versionedTransition = transitionhistory .CopyVersionedTransition (transitionhistory .LastVersionedTransition (mutableState .GetExecutionInfo ().TransitionHistory ))
82+ versionHistories = versionhistory .CopyVersionHistories (mutableState .GetExecutionInfo ().VersionHistories )
83+ return versionedTransition , versionHistories , nil
84+ }
85+
86+ func Invoke (
87+ ctx context.Context ,
88+ request * historyservice.VerifyChildExecutionCompletionRecordedRequest ,
89+ workflowConsistencyChecker api.WorkflowConsistencyChecker ,
90+ shardContext historyi.ShardContext ,
91+ ) (* historyservice.VerifyChildExecutionCompletionRecordedResponse , error ) {
92+ namespaceID := namespace .ID (request .GetNamespaceId ())
93+ if err := api .ValidateNamespaceUUID (namespaceID ); err != nil {
94+ return nil , err
95+ }
96+
97+ resendParent := false
98+ versionedTransition , versionHistories , err := verifyChildExecution (ctx , workflowConsistencyChecker , request )
99+ switch err .(type ) {
100+ case nil :
101+ return & historyservice.VerifyChildExecutionCompletionRecordedResponse {}, nil
102+ case * serviceerror.NotFound , * serviceerror.WorkflowNotReady :
103+ resendParent = request .GetResendParent ()
104+ }
105+ if ! resendParent {
106+ return nil , err
107+ }
108+
109+ // Resend parent workflow from source cluster
110+
111+ clusterMetadata := shardContext .GetClusterMetadata ()
112+ targetClusterInfo := clusterMetadata .GetAllClusterInfo ()[clusterMetadata .GetCurrentClusterName ()]
113+
114+ namespaceEntry , err := shardContext .GetNamespaceRegistry ().GetNamespaceByID (namespace .ID (namespaceID ))
115+ if err != nil {
116+ return nil , err
117+ }
118+
119+ activeClusterName := namespaceEntry .ActiveClusterName ()
120+ if activeClusterName == clusterMetadata .GetCurrentClusterName () {
121+ return nil , errors .New ("namespace becomes active when processing task as standby" )
122+ }
123+
124+ remoteAdminClient , err := shardContext .GetRemoteAdminClient (activeClusterName )
125+ if err != nil {
126+ return nil , err
127+ }
128+
129+ resp , err := remoteAdminClient .SyncWorkflowState (ctx , & adminservice.SyncWorkflowStateRequest {
130+ NamespaceId : request .NamespaceId ,
131+ Execution : & commonpb.WorkflowExecution {
132+ WorkflowId : request .ParentExecution .WorkflowId ,
133+ RunId : request .ParentExecution .RunId ,
134+ },
135+ VersionedTransition : versionedTransition ,
136+ VersionHistories : versionHistories ,
137+ TargetClusterId : int32 (targetClusterInfo .InitialFailoverVersion ),
138+ })
139+
140+ if err != nil {
141+ if common .IsNotFoundError (err ) {
142+ // parent workflow is not found on source cluster,
143+ // we can return empty response to indicate that verification is done
144+ // TODO: add parent workflow to workflowNotFoundCache
145+ return & historyservice.VerifyChildExecutionCompletionRecordedResponse {}, nil
71146 }
147+ return nil , err
148+ }
72149
73- return nil , consts .ErrWorkflowNotReady
150+ engine , err := shardContext .GetEngine (ctx )
151+ if err != nil {
152+ return nil , err
153+ }
154+ err = engine .ReplicateVersionedTransition (ctx , resp .VersionedTransitionArtifact , activeClusterName )
155+ if err != nil {
156+ return nil , err
74157 }
75158
159+ // Verify child execution again after resending parent workflow
160+ _ , _ , err = verifyChildExecution (ctx , workflowConsistencyChecker , request )
161+ if err != nil {
162+ return nil , err
163+ }
76164 return & historyservice.VerifyChildExecutionCompletionRecordedResponse {}, nil
77165}
0 commit comments