@@ -8,11 +8,13 @@ import (
88 commonpb "go.temporal.io/api/common/v1"
99 enumspb "go.temporal.io/api/enums/v1"
1010 "go.temporal.io/api/serviceerror"
11+ enumsspb "go.temporal.io/server/api/enums/v1"
1112 "go.temporal.io/server/api/historyservice/v1"
1213 "go.temporal.io/server/api/matchingservice/v1"
1314 "go.temporal.io/server/common"
1415 "go.temporal.io/server/common/definition"
1516 "go.temporal.io/server/common/locks"
17+ "go.temporal.io/server/common/log/tag"
1618 "go.temporal.io/server/common/namespace"
1719 "go.temporal.io/server/common/persistence/visibility/manager"
1820 "go.temporal.io/server/common/tasktoken"
@@ -22,6 +24,7 @@ import (
2224 "go.temporal.io/server/service/history/api/updateworkflow"
2325 historyi "go.temporal.io/server/service/history/interfaces"
2426 "go.temporal.io/server/service/history/workflow"
27+ "google.golang.org/protobuf/types/known/timestamppb"
2528)
2629
2730var multiOpAbortedErr = serviceerror .NewMultiOperationAborted ("Operation was aborted." )
@@ -43,6 +46,23 @@ type (
4346
4447 updater * updateworkflow.Updater
4548 starter * startworkflow.Starter
49+
50+ // TODO(stephanos): remove again
51+ leaseCreated bool
52+ leaseUpdated bool
53+ applyUpdateSinceUseExisting bool
54+ applyUpdateSinceExists bool
55+ applyUpdateSinceDedup bool
56+ retryWithUnavailable bool
57+ updateErrOnStart bool
58+ startErrOnStart bool
59+ workflowIsRunning bool
60+ startingWorkflow bool
61+ startOutcome startworkflow.StartOutcome
62+ existingWorkflowRunID string
63+ existingWorkflowState enumsspb.WorkflowExecutionState
64+ existingWorkflowStatus enumspb.WorkflowExecutionStatus
65+ existingWorkflowStartedAt * timestamppb.Timestamp
4666 }
4767)
4868
@@ -55,7 +75,7 @@ func Invoke(
5575 visibilityManager manager.VisibilityManager ,
5676 matchingClient matchingservice.MatchingServiceClient ,
5777 testHooks testhooks.TestHooks ,
58- ) (* historyservice.ExecuteMultiOperationResponse , error ) {
78+ ) (_ * historyservice.ExecuteMultiOperationResponse , retErr error ) {
5979 if len (req .Operations ) != 2 {
6080 return nil , serviceerror .NewInvalidArgument ("expected exactly 2 operations" )
6181 }
@@ -79,6 +99,39 @@ func Invoke(
7999 startReq : startReq ,
80100 }
81101
102+ // TODO(stephanos): remove again
103+ defer func () {
104+ if retErr != nil {
105+ if shardContext .GetConfig ().EnableExecuteMultiOperationErrorDebug (updateReq .GetRequest ().Namespace ) {
106+ shardContext .GetLogger ().Warn (
107+ "MultiOperation failed" ,
108+ tag .Error (retErr ),
109+ tag .ServiceErrorType (retErr ),
110+ tag .NewBoolTag ("leaseCreated" , mo .leaseCreated ),
111+ tag .NewBoolTag ("leaseUpdated" , mo .leaseUpdated ),
112+ tag .NewBoolTag ("applyUpdateSinceUseExisting" , mo .applyUpdateSinceUseExisting ),
113+ tag .NewBoolTag ("applyUpdateSinceExists" , mo .applyUpdateSinceExists ),
114+ tag .NewBoolTag ("applyUpdateSinceDedup" , mo .applyUpdateSinceDedup ),
115+ tag .NewBoolTag ("retryWithUnavailable" , mo .retryWithUnavailable ),
116+ tag .NewBoolTag ("updateErrOnStart" , mo .updateErrOnStart ),
117+ tag .NewBoolTag ("startErrOnStart" , mo .startErrOnStart ),
118+ tag .NewBoolTag ("workflowIsRunning" , mo .workflowIsRunning ),
119+ tag .NewBoolTag ("startingWorkflow" , mo .startingWorkflow ),
120+ tag .NewStringTag ("startOutcome" , mo .startOutcome .String ()),
121+ tag .NewStringTag ("existingWorkflowRunID" , mo .existingWorkflowRunID ),
122+ tag .NewStringTag ("existingWorkflowState" , mo .existingWorkflowState .String ()),
123+ tag .NewStringTag ("existingWorkflowStatus" , mo .existingWorkflowStatus .String ()),
124+ tag .NewTimePtrTag ("existingWorkflowStartedAt" , mo .existingWorkflowStartedAt ),
125+ tag .NewStringTag ("reqUpdateID" , updateReq .GetRequest ().GetRequest ().GetMeta ().GetUpdateId ()),
126+ tag .NewStringTag ("reqReusePolicy" , startReq .GetStartRequest ().GetWorkflowIdReusePolicy ().String ()),
127+ tag .NewStringTag ("reqConflictPolicy" , startReq .GetStartRequest ().GetWorkflowIdConflictPolicy ().String ()),
128+ tag .NewStringTag ("reqRunID" , updateReq .GetRequest ().GetWorkflowExecution ().GetRunId ()),
129+ tag .NewStringTag ("reqFirstExecutionRunId" , updateReq .GetRequest ().GetFirstExecutionRunId ()),
130+ tag .WorkflowID (req .WorkflowId ))
131+ }
132+ }
133+ }()
134+
82135 var err error
83136 mo .starter , err = startworkflow .NewStarter (
84137 shardContext ,
@@ -110,6 +163,10 @@ func (mo *multiOp) Invoke(ctx context.Context) (*historyservice.ExecuteMultiOper
110163
111164 // Workflow already exists.
112165 if workflowLease != nil {
166+ mo .existingWorkflowRunID = workflowLease .GetContext ().GetWorkflowKey ().RunID
167+ mo .existingWorkflowState = workflowLease .GetMutableState ().GetExecutionState ().GetState ()
168+ mo .existingWorkflowStatus = workflowLease .GetMutableState ().GetExecutionState ().GetStatus ()
169+ mo .existingWorkflowStartedAt = workflowLease .GetMutableState ().GetExecutionState ().GetStartTime ()
113170 updateID := mo .updateReq .Request .Request .Meta .GetUpdateId ()
114171
115172 // If Update is complete, return it.
@@ -128,18 +185,23 @@ func (mo *multiOp) Invoke(ctx context.Context) (*historyservice.ExecuteMultiOper
128185
129186 // If Workflow is running ...
130187 if workflowLease .GetMutableState ().IsWorkflowExecutionRunning () {
188+ mo .workflowIsRunning = true
189+
131190 // If Start is deduped, apply new/attach to update.
132191 if canDedup (mo .startReq , workflowLease ) {
192+ mo .applyUpdateSinceDedup = true
133193 return mo .updateWorkflow (ctx , workflowLease ) // lease released inside
134194 }
135195
136196 // If Update exists, attach to update.
137197 if upd := workflowLease .GetContext ().UpdateRegistry (ctx ).Find (ctx , updateID ); upd != nil {
198+ mo .applyUpdateSinceExists = true
138199 return mo .updateWorkflow (ctx , workflowLease ) // lease released inside
139200 }
140201
141202 // If conflict policy allows re-using the workflow, apply the update.
142203 if mo .startReq .StartRequest .WorkflowIdConflictPolicy == enumspb .WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING {
204+ mo .applyUpdateSinceUseExisting = true
143205 return mo .updateWorkflow (ctx , workflowLease ) // lease released inside
144206 }
145207 }
@@ -165,6 +227,8 @@ func (mo *multiOp) workflowLeaseCallback(
165227 var res api.WorkflowLease
166228
167229 if existingLease == nil {
230+ mo .leaseCreated = true
231+
168232 // Create a new *locked* workflow context. This is important since without the lock, task processing
169233 // would try to modify the mutable state concurrently. Once the Starter completes, it will release the lock.
170234 //
@@ -184,6 +248,8 @@ func (mo *multiOp) workflowLeaseCallback(
184248 }
185249 res = api .NewWorkflowLease (workflowContext , releaseFunc , ms )
186250 } else {
251+ mo .leaseUpdated = true
252+
187253 // TODO(stephanos): remove this hack
188254 // If the lease already exists, but the update needs to be re-applied since it was aborted due to a conflict.
189255 res = existingLease
@@ -277,16 +343,24 @@ func (mo *multiOp) updateWorkflow(
277343}
278344
279345func (mo * multiOp ) startAndUpdateWorkflow (ctx context.Context ) (* historyservice.ExecuteMultiOperationResponse , error ) {
346+ mo .startingWorkflow = true
347+
280348 startResp , startOutcome , err := mo .starter .Invoke (ctx )
349+ mo .startOutcome = startOutcome
281350 if err != nil {
282351 // An update error occurred.
283352 if errors .As (err , & updateError {}) {
353+ mo .updateErrOnStart = true
284354 return nil , newMultiOpError (multiOpAbortedErr , err )
285355 }
356+
286357 // A start error occurred.
358+ mo .startErrOnStart = true
287359 return nil , newMultiOpError (err , multiOpAbortedErr )
288360 }
289361 if startOutcome != startworkflow .StartNew {
362+ mo .retryWithUnavailable = true
363+
290364 // The workflow was meant to be started - but was actually *not* started.
291365 // The problem is that the update has not been applied.
292366 //
0 commit comments