Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions service/history/api/consistency_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type (
reqClock *clockspb.VectorClock,
consistencyPredicate MutableStateConsistencyPredicate,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (WorkflowContext, error)
}

Expand Down Expand Up @@ -108,6 +109,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
reqClock *clockspb.VectorClock,
consistencyPredicate MutableStateConsistencyPredicate,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (WorkflowContext, error) {
if reqClock != nil {
currentClock := c.shardContext.CurrentVectorClock()
Expand All @@ -117,6 +119,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
reqClock,
currentClock,
workflowKey,
lockPriority,
)
}
// request vector clock cannot is not comparable with current shard vector clock
Expand All @@ -133,6 +136,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
&shardOwnershipAsserted,
consistencyPredicate,
workflowKey,
lockPriority,
)
}
return c.getCurrentWorkflowContext(
Expand All @@ -141,6 +145,7 @@ func (c *WorkflowConsistencyCheckerImpl) GetWorkflowContext(
consistencyPredicate,
workflowKey.NamespaceID,
workflowKey.WorkflowID,
lockPriority,
)
}

Expand All @@ -149,6 +154,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
reqClock *clockspb.VectorClock,
currentClock *clockspb.VectorClock,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (WorkflowContext, error) {
cmpResult, err := vclock.Compare(reqClock, currentClock)
if err != nil {
Expand All @@ -170,7 +176,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByClock(
WorkflowId: workflowKey.WorkflowID,
RunId: workflowKey.RunID,
},
workflow.CallerTypeAPI,
lockPriority,
)
if err != nil {
return nil, err
Expand All @@ -189,6 +195,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck(
shardOwnershipAsserted *bool,
consistencyPredicate MutableStateConsistencyPredicate,
workflowKey definition.WorkflowKey,
lockPriority workflow.LockPriority,
) (WorkflowContext, error) {
if len(workflowKey.RunID) == 0 {
return nil, serviceerror.NewInternal(fmt.Sprintf(
Expand All @@ -203,7 +210,7 @@ func (c *WorkflowConsistencyCheckerImpl) getWorkflowContextValidatedByCheck(
WorkflowId: workflowKey.WorkflowID,
RunId: workflowKey.RunID,
},
workflow.CallerTypeAPI,
lockPriority,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -245,6 +252,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext(
consistencyPredicate MutableStateConsistencyPredicate,
namespaceID string,
workflowID string,
lockPriority workflow.LockPriority,
) (WorkflowContext, error) {
runID, err := c.getCurrentRunID(
ctx,
Expand All @@ -260,6 +268,7 @@ func (c *WorkflowConsistencyCheckerImpl) getCurrentWorkflowContext(
shardOwnershipAsserted,
consistencyPredicate,
definition.NewWorkflowKey(namespaceID, workflowID, runID),
lockPriority,
)
if err != nil {
return nil, err
Expand Down
15 changes: 10 additions & 5 deletions service/history/api/consistency_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState, nil)

Expand All @@ -124,6 +124,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.NoError(err)
s.Equal(mutableState, workflowContext.GetMutableState())
Expand All @@ -147,7 +148,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
gomock.InOrder(
wfContext.EXPECT().LoadMutableState(ctx).Return(mutableState1, nil),
Expand All @@ -160,6 +161,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.NoError(err)
s.Equal(mutableState2, workflowContext.GetMutableState())
Expand All @@ -181,7 +183,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound(""))

Expand All @@ -192,6 +194,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.IsType(&serviceerror.NotFound{}, err)
s.Nil(workflowContext)
Expand All @@ -213,7 +216,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewNotFound(""))

Expand All @@ -224,6 +227,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.IsType(&persistence.ShardOwnershipLostError{}, err)
s.Nil(workflowContext)
Expand All @@ -245,7 +249,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
WorkflowId: s.workflowID,
RunId: s.currentRunID,
},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
).Return(wfContext, releaseFn, nil)
wfContext.EXPECT().LoadMutableState(ctx).Return(nil, serviceerror.NewUnavailable(""))

Expand All @@ -254,6 +258,7 @@ func (s *workflowConsistencyCheckerSuite) TestGetWorkflowContextValidatedByCheck
&shardOwnershipAsserted,
FailMutableStateConsistencyPredicate,
definition.NewWorkflowKey(s.namespaceID, s.workflowID, s.currentRunID),
workflow.LockPriorityHigh,
)
s.IsType(&serviceerror.Unavailable{}, err)
s.Nil(workflowContext)
Expand Down
1 change: 1 addition & 0 deletions service/history/api/deleteworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func Invoke(
request.WorkflowExecution.WorkflowId,
request.WorkflowExecution.RunId,
),
workflow.LockPriorityLow,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/describemutablestate/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func Invoke(
req.Execution.WorkflowId,
req.Execution.RunId,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/describeworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func Invoke(
req.Request.Execution.WorkflowId,
req.Request.Execution.RunId,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/get_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func GetMutableState(
nil,
BypassMutableStateConsistencyPredicate,
workflowKey,
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func Invoke(
nil,
api.BypassMutableStateConsistencyPredicate,
workflowKey,
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/refreshworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Invoke(
nil,
api.BypassMutableStateConsistencyPredicate,
workflowKey,
workflow.LockPriorityLow,
)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/replication/generate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
)

func GenerateTask(
Expand All @@ -57,6 +58,7 @@ func GenerateTask(
request.Execution.WorkflowId,
request.Execution.RunId,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions service/history/api/resetworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/ndc"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
Expand Down Expand Up @@ -66,6 +67,7 @@ func Invoke(
workflowID,
baseRunID,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -104,6 +106,7 @@ func Invoke(
workflowID,
currentRunID,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/signalwithstartworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
Expand All @@ -59,6 +60,7 @@ func Invoke(
signalWithStartRequest.SignalWithStartRequest.WorkflowId,
"",
),
workflow.LockPriorityHigh,
)
switch err.(type) {
case nil:
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (s *Starter) lockCurrentWorkflowExecution(
ctx,
s.namespace.ID(),
s.request.StartRequest.WorkflowId,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -437,7 +437,7 @@ func (s *Starter) getMutableStateInfo(ctx context.Context, runID string) (*mutab
ctx,
s.namespace.ID(),
commonpb.WorkflowExecution{WorkflowId: s.request.StartRequest.WorkflowId, RunId: runID},
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/update_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func GetAndUpdateWorkflowWithNew(
reqClock,
consistencyCheckFn,
workflowKey,
workflow.LockPriorityHigh,
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/updateworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func Invoke(
req.Request.WorkflowExecution.WorkflowId,
req.Request.WorkflowExecution.RunId,
),
workflow.LockPriorityHigh,
)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
Expand All @@ -61,6 +62,7 @@ func Invoke(
request.ParentExecution.WorkflowId,
request.ParentExecution.RunId,
),
workflow.LockPriorityLow,
)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (s *engine2Suite) TestRecordWorkflowTaskStartedSuccess() {
metrics.AddMetricsContext(context.Background()),
tests.NamespaceID,
workflowExecution,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
s.NoError(err)
loadedMS, err := ctx.LoadMutableState(context.Background())
Expand Down Expand Up @@ -1946,7 +1946,7 @@ func (s *engine2Suite) getMutableState(namespaceID namespace.ID, we commonpb.Wor
metrics.AddMetricsContext(context.Background()),
namespaceID,
we,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
if err != nil {
return nil
Expand Down
6 changes: 3 additions & 3 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func (s *engineSuite) TestQueryWorkflow_ConsistentQueryBufferFull() {
context.Background(),
tests.NamespaceID,
execution,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
s.NoError(err)
loadedMS, err := ctx.LoadMutableState(context.Background())
Expand Down Expand Up @@ -4365,7 +4365,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_SuccessWith
context.Background(),
tests.NamespaceID,
we,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
s.NoError(err)
loadedMS, err := ctx.LoadMutableState(context.Background())
Expand Down Expand Up @@ -5230,7 +5230,7 @@ func (s *engineSuite) getMutableState(testNamespaceID namespace.ID, we commonpb.
context.Background(),
tests.NamespaceID,
we,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
if err != nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion service/history/ndc/activity_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (r *ActivityReplicatorImpl) SyncActivity(
ctx,
namespaceID,
execution,
workflow.CallerTypeAPI,
workflow.LockPriorityHigh,
)
if err != nil {
// for get workflow execution context, with valid run id
Expand Down
Loading