Skip to content

Commit b0b3ea6

Browse files
committed
Validation API should use low priority lock (#4140)
* Use low priority lock for VerifyFirstWorkflowTaskScheduled API * Use low priority lock for VerifyChildExecutionCompletionRecorded API * Rename caller type to lock priority since some internal API use "low priority lock"
1 parent 55f1589 commit b0b3ea6

36 files changed

+125
-90
lines changed

service/history/api/consistency_checker.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type (
5959
reqClock *clockspb.VectorClock,
6060
consistencyPredicate MutableStateConsistencyPredicate,
6161
workflowKey definition.WorkflowKey,
62+
lockPriority workflow.LockPriority,
6263
) (WorkflowContext, error)
6364
}
6465

@@ -108,6 +109,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
108109
reqClock *clockspb.VectorClock,
109110
consistencyPredicate MutableStateConsistencyPredicate,
110111
workflowKey definition.WorkflowKey,
112+
lockPriority workflow.LockPriority,
111113
) (WorkflowContext, error) {
112114
if reqClock != nil {
113115
currentClock := c.shardContext.CurrentVectorClock()
@@ -117,6 +119,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
117119
reqClock,
118120
currentClock,
119121
workflowKey,
122+
lockPriority,
120123
)
121124
}
122125
// request vector clock cannot is not comparable with current shard vector clock
@@ -133,6 +136,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
133136
&shardOwnershipAsserted,
134137
consistencyPredicate,
135138
workflowKey,
139+
lockPriority,
136140
)
137141
}
138142
return c.getCurrentWorkflowContext(
@@ -141,6 +145,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
141145
consistencyPredicate,
142146
workflowKey.NamespaceID,
143147
workflowKey.WorkflowID,
148+
lockPriority,
144149
)
145150
}
146151

@@ -149,6 +154,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
149154
reqClock *clockspb.VectorClock,
150155
currentClock *clockspb.VectorClock,
151156
workflowKey definition.WorkflowKey,
157+
lockPriority workflow.LockPriority,
152158
) (WorkflowContext, error) {
153159
cmpResult, err := vclock.Compare(reqClock, currentClock)
154160
if err != nil {
@@ -170,7 +176,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
170176
WorkflowId: workflowKey.WorkflowID,
171177
RunId: workflowKey.RunID,
172178
},
173-
workflow.CallerTypeAPI,
179+
lockPriority,
174180
)
175181
if err != nil {
176182
return nil, err
@@ -189,6 +195,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck(
189195
shardOwnershipAsserted *bool,
190196
consistencyPredicate MutableStateConsistencyPredicate,
191197
workflowKey definition.WorkflowKey,
198+
lockPriority workflow.LockPriority,
192199
) (WorkflowContext, error) {
193200
if len(workflowKey.RunID) == 0 {
194201
return nil, serviceerror.NewInternal(fmt.Sprintf(
@@ -203,7 +210,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck(
203210
WorkflowId: workflowKey.WorkflowID,
204211
RunId: workflowKey.RunID,
205212
},
206-
workflow.CallerTypeAPI,
213+
lockPriority,
207214
)
208215
if err != nil {
209216
return nil, err
@@ -245,6 +252,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext(
245252
consistencyPredicate MutableStateConsistencyPredicate,
246253
namespaceID string,
247254
workflowID string,
255+
lockPriority workflow.LockPriority,
248256
) (WorkflowContext, error) {
249257
runID, err := c.getCurrentRunID(
250258
ctx,
@@ -260,6 +268,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext(
260268
shardOwnershipAsserted,
261269
consistencyPredicate,
262270
definition.NewWorkflowKey(namespaceID, workflowID, runID),
271+
lockPriority,
263272
)
264273
if err != nil {
265274
return nil, err

service/history/api/consistency_checker_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
115115
WorkflowId: s.workflowID,
116116
RunId: s.currentRunID,
117117
},
118-
workflow.CallerTypeAPI,
118+
workflow.LockPriorityHigh,
119119
).Return(wfContext, releaseFn, nil)
120120
wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState, nil)
121121

@@ -124,6 +124,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
124124
&shardOwnershipAsserted,
125125
BypassMutableStateConsistencyPredicate,
126126
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
127+
workflow.LockPriorityHigh,
127128
)
128129
s.NoError(err)
129130
s.Equal(mutableState, workflowContext.GetMutableState())
@@ -147,7 +148,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
147148
WorkflowId: s.workflowID,
148149
RunId: s.currentRunID,
149150
},
150-
workflow.CallerTypeAPI,
151+
workflow.LockPriorityHigh,
151152
).Return(wfContext, releaseFn, nil)
152153
gomock.InOrder(
153154
wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState1, nil),
@@ -160,6 +161,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
160161
&shardOwnershipAsserted,
161162
FailMutableStateConsistencyPredicate,
162163
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
164+
workflow.LockPriorityHigh,
163165
)
164166
s.NoError(err)
165167
s.Equal(mutableState2, workflowContext.GetMutableState())
@@ -181,7 +183,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
181183
WorkflowId: s.workflowID,
182184
RunId: s.currentRunID,
183185
},
184-
workflow.CallerTypeAPI,
186+
workflow.LockPriorityHigh,
185187
).Return(wfContext, releaseFn, nil)
186188
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound(""))
187189

@@ -192,6 +194,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
192194
&shardOwnershipAsserted,
193195
FailMutableStateConsistencyPredicate,
194196
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
197+
workflow.LockPriorityHigh,
195198
)
196199
s.IsType(&serviceerror.NotFound{}, err)
197200
s.Nil(workflowContext)
@@ -213,7 +216,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
213216
WorkflowId: s.workflowID,
214217
RunId: s.currentRunID,
215218
},
216-
workflow.CallerTypeAPI,
219+
workflow.LockPriorityHigh,
217220
).Return(wfContext, releaseFn, nil)
218221
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound(""))
219222

@@ -224,6 +227,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
224227
&shardOwnershipAsserted,
225228
FailMutableStateConsistencyPredicate,
226229
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
230+
workflow.LockPriorityHigh,
227231
)
228232
s.IsType(&persistence.ShardOwnershipLostError{}, err)
229233
s.Nil(workflowContext)
@@ -245,7 +249,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
245249
WorkflowId: s.workflowID,
246250
RunId: s.currentRunID,
247251
},
248-
workflow.CallerTypeAPI,
252+
workflow.LockPriorityHigh,
249253
).Return(wfContext, releaseFn, nil)
250254
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewUnavailable(""))
251255

@@ -254,6 +258,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
254258
&shardOwnershipAsserted,
255259
FailMutableStateConsistencyPredicate,
256260
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
261+
workflow.LockPriorityHigh,
257262
)
258263
s.IsType(&serviceerror.Unavailable{}, err)
259264
s.Nil(workflowContext)

service/history/api/deleteworkflow/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func Invoke(
5555
request.WorkflowExecution.WorkflowId,
5656
request.WorkflowExecution.RunId,
5757
),
58+
workflow.LockPriorityLow,
5859
)
5960
if err != nil {
6061
return nil, err

service/history/api/describemutablestate/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func Invoke(
5656
req.Execution.WorkflowId,
5757
req.Execution.RunId,
5858
),
59+
workflow.LockPriorityHigh,
5960
)
6061
if err != nil {
6162
return nil, err

service/history/api/describeworkflow/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func Invoke(
6868
req.Request.Execution.WorkflowId,
6969
req.Request.Execution.RunId,
7070
),
71+
workflow.LockPriorityHigh,
7172
)
7273
if err != nil {
7374
return nil, err

service/history/api/get_workflow_util.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ func GetMutableState(
162162
nil,
163163
BypassMutableStateConsistencyPredicate,
164164
workflowKey,
165+
workflow.LockPriorityHigh,
165166
)
166167
if err != nil {
167168
return nil, err

service/history/api/queryworkflow/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func Invoke(
8686
nil,
8787
api.BypassMutableStateConsistencyPredicate,
8888
workflowKey,
89+
workflow.LockPriorityHigh,
8990
)
9091
if err != nil {
9192
return nil, err

service/history/api/refreshworkflow/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func Invoke(
5151
nil,
5252
api.BypassMutableStateConsistencyPredicate,
5353
workflowKey,
54+
workflow.LockPriorityLow,
5455
)
5556
if err != nil {
5657
return err

service/history/api/replication/generate_task.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"go.temporal.io/server/service/history/api"
3535
"go.temporal.io/server/service/history/shard"
3636
"go.temporal.io/server/service/history/tasks"
37+
"go.temporal.io/server/service/history/workflow"
3738
)
3839

3940
func GenerateTask(
@@ -57,6 +58,7 @@ func GenerateTask(
5758
request.Execution.WorkflowId,
5859
request.Execution.RunId,
5960
),
61+
workflow.LockPriorityHigh,
6062
)
6163
if err != nil {
6264
return nil, err

service/history/api/resetworkflow/api.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"go.temporal.io/server/service/history/api"
4040
"go.temporal.io/server/service/history/ndc"
4141
"go.temporal.io/server/service/history/shard"
42+
"go.temporal.io/server/service/history/workflow"
4243
)
4344

4445
func Invoke(
@@ -66,6 +67,7 @@ func Invoke(
6667
workflowID,
6768
baseRunID,
6869
),
70+
workflow.LockPriorityHigh,
6971
)
7072
if err != nil {
7173
return nil, err
@@ -104,6 +106,7 @@ func Invoke(
104106
workflowID,
105107
currentRunID,
106108
),
109+
workflow.LockPriorityHigh,
107110
)
108111
if err != nil {
109112
return nil, err

0 commit comments

Comments
 (0)