Skip to content

Commit b292c22

Browse files
authored
Move delete workflow to api package (#3473)
1 parent d252656 commit b292c22

File tree

5 files changed

+135
-80
lines changed

5 files changed

+135
-80
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package deleteworkflow
26+
27+
import (
28+
"context"
29+
30+
commonpb "go.temporal.io/api/common/v1"
31+
32+
"go.temporal.io/server/api/historyservice/v1"
33+
"go.temporal.io/server/common/definition"
34+
"go.temporal.io/server/common/namespace"
35+
"go.temporal.io/server/service/history/api"
36+
"go.temporal.io/server/service/history/consts"
37+
"go.temporal.io/server/service/history/shard"
38+
"go.temporal.io/server/service/history/workflow"
39+
)
40+
41+
func Invoke(
42+
ctx context.Context,
43+
request *historyservice.DeleteWorkflowExecutionRequest,
44+
shard shard.Context,
45+
workflowConsistencyChecker api.WorkflowConsistencyChecker,
46+
workflowDeleteManager workflow.DeleteManager,
47+
) (_ *historyservice.DeleteWorkflowExecutionResponse, retError error) {
48+
weCtx, err := workflowConsistencyChecker.GetWorkflowContext(
49+
ctx,
50+
nil,
51+
api.BypassMutableStateConsistencyPredicate,
52+
definition.NewWorkflowKey(
53+
request.NamespaceId,
54+
request.WorkflowExecution.WorkflowId,
55+
request.WorkflowExecution.RunId,
56+
),
57+
)
58+
if err != nil {
59+
return nil, err
60+
}
61+
defer func() { weCtx.GetReleaseFn()(retError) }()
62+
63+
// Open and Close workflow executions are deleted differently.
64+
// Open workflow execution is deleted by terminating with special flag `deleteAfterTerminate` set to true.
65+
// This flag will be carried over with CloseExecutionTask and workflow will be deleted as the last step while processing the task.
66+
//
67+
// Close workflow execution is deleted using DeleteExecutionTask.
68+
//
69+
// DeleteWorkflowExecution is not replicated automatically. Workflow executions must be deleted separately in each cluster.
70+
// Although running workflows in active cluster are terminated first and the termination event might be replicated.
71+
// In passive cluster, workflow executions are just deleted in regardless of its state.
72+
73+
if weCtx.GetMutableState().IsWorkflowExecutionRunning() {
74+
if request.GetClosedWorkflowOnly() {
75+
// skip delete open workflow
76+
return &historyservice.DeleteWorkflowExecutionResponse{}, nil
77+
}
78+
ns, err := shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(request.GetNamespaceId()))
79+
if err != nil {
80+
return nil, err
81+
}
82+
if ns.ActiveInCluster(shard.GetClusterMetadata().GetCurrentClusterName()) {
83+
// If workflow execution is running and in active cluster.
84+
if err := api.UpdateWorkflowWithNew(
85+
shard,
86+
ctx,
87+
weCtx,
88+
func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) {
89+
mutableState := workflowContext.GetMutableState()
90+
eventBatchFirstEventID := mutableState.GetNextEventID()
91+
92+
return api.UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow(
93+
mutableState,
94+
eventBatchFirstEventID,
95+
"Delete workflow execution",
96+
nil,
97+
consts.IdentityHistoryService,
98+
true,
99+
)
100+
},
101+
nil,
102+
); err != nil {
103+
return nil, err
104+
}
105+
return &historyservice.DeleteWorkflowExecutionResponse{}, nil
106+
}
107+
}
108+
109+
// If workflow execution is closed or in passive cluster.
110+
if err := workflowDeleteManager.AddDeleteWorkflowExecutionTask(
111+
ctx,
112+
namespace.ID(request.GetNamespaceId()),
113+
commonpb.WorkflowExecution{
114+
WorkflowId: request.GetWorkflowExecution().GetWorkflowId(),
115+
RunId: request.GetWorkflowExecution().GetRunId(),
116+
},
117+
weCtx.GetMutableState(),
118+
request.GetWorkflowVersion(),
119+
); err != nil {
120+
return nil, err
121+
}
122+
return &historyservice.DeleteWorkflowExecutionResponse{}, nil
123+
}

service/history/handler.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,12 +1023,11 @@ func (h *Handler) DeleteWorkflowExecution(ctx context.Context, request *historys
10231023
return nil, h.convertError(err)
10241024
}
10251025

1026-
err2 := engine.DeleteWorkflowExecution(ctx, request)
1027-
if err2 != nil {
1028-
return nil, h.convertError(err2)
1026+
resp, err := engine.DeleteWorkflowExecution(ctx, request)
1027+
if err != nil {
1028+
return nil, h.convertError(err)
10291029
}
1030-
1031-
return &historyservice.DeleteWorkflowExecutionResponse{}, nil
1030+
return resp, nil
10321031
}
10331032

10341033
// ResetWorkflowExecution reset an existing workflow execution

service/history/historyEngine.go

Lines changed: 3 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import (
5555
"go.temporal.io/server/common/sdk"
5656
"go.temporal.io/server/common/searchattribute"
5757
"go.temporal.io/server/service/history/api"
58+
"go.temporal.io/server/service/history/api/deleteworkflow"
5859
"go.temporal.io/server/service/history/api/describeworkflow"
5960
"go.temporal.io/server/service/history/api/reapplyevents"
6061
"go.temporal.io/server/service/history/api/recordactivitytaskheartbeat"
@@ -885,77 +886,8 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
885886
func (e *historyEngineImpl) DeleteWorkflowExecution(
886887
ctx context.Context,
887888
request *historyservice.DeleteWorkflowExecutionRequest,
888-
) (retError error) {
889-
890-
weCtx, err := e.workflowConsistencyChecker.GetWorkflowContext(
891-
ctx,
892-
nil,
893-
api.BypassMutableStateConsistencyPredicate,
894-
definition.NewWorkflowKey(
895-
request.NamespaceId,
896-
request.WorkflowExecution.WorkflowId,
897-
request.WorkflowExecution.RunId,
898-
),
899-
)
900-
if err != nil {
901-
return err
902-
}
903-
defer func() { weCtx.GetReleaseFn()(retError) }()
904-
905-
// Open and Close workflow executions are deleted differently.
906-
// Open workflow execution is deleted by terminating with special flag `deleteAfterTerminate` set to true.
907-
// This flag will be carried over with CloseExecutionTask and workflow will be deleted as the last step while processing the task.
908-
//
909-
// Close workflow execution is deleted using DeleteExecutionTask.
910-
//
911-
// DeleteWorkflowExecution is not replicated automatically. Workflow executions must be deleted separately in each cluster.
912-
// Although running workflows in active cluster are terminated first and the termination event might be replicated.
913-
// In passive cluster, workflow executions are just deleted in regardless of its state.
914-
915-
if weCtx.GetMutableState().IsWorkflowExecutionRunning() {
916-
if request.GetClosedWorkflowOnly() {
917-
// skip delete open workflow
918-
return nil
919-
}
920-
ns, err := e.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(request.GetNamespaceId()))
921-
if err != nil {
922-
return err
923-
}
924-
if ns.ActiveInCluster(e.shard.GetClusterMetadata().GetCurrentClusterName()) {
925-
// If workflow execution is running and in active cluster.
926-
return api.UpdateWorkflowWithNew(
927-
e.shard,
928-
ctx,
929-
weCtx,
930-
func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) {
931-
mutableState := workflowContext.GetMutableState()
932-
eventBatchFirstEventID := mutableState.GetNextEventID()
933-
934-
return api.UpdateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow(
935-
mutableState,
936-
eventBatchFirstEventID,
937-
"Delete workflow execution",
938-
nil,
939-
consts.IdentityHistoryService,
940-
true,
941-
)
942-
},
943-
nil,
944-
)
945-
}
946-
}
947-
948-
// If workflow execution is closed or in passive cluster.
949-
return e.workflowDeleteManager.AddDeleteWorkflowExecutionTask(
950-
ctx,
951-
namespace.ID(request.GetNamespaceId()),
952-
commonpb.WorkflowExecution{
953-
WorkflowId: request.GetWorkflowExecution().GetWorkflowId(),
954-
RunId: request.GetWorkflowExecution().GetRunId(),
955-
},
956-
weCtx.GetMutableState(),
957-
request.GetWorkflowVersion(),
958-
)
889+
) (*historyservice.DeleteWorkflowExecutionResponse, error) {
890+
return deleteworkflow.Invoke(ctx, request, e.shard, e.workflowConsistencyChecker, e.workflowDeleteManager)
959891
}
960892

961893
// RecordChildExecutionCompleted records the completion of child execution into parent execution history

service/history/shard/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type (
6565
SignalWithStartWorkflowExecution(ctx context.Context, request *historyservice.SignalWithStartWorkflowExecutionRequest) (*historyservice.SignalWithStartWorkflowExecutionResponse, error)
6666
RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) (*historyservice.RemoveSignalMutableStateResponse, error)
6767
TerminateWorkflowExecution(ctx context.Context, request *historyservice.TerminateWorkflowExecutionRequest) (*historyservice.TerminateWorkflowExecutionResponse, error)
68-
DeleteWorkflowExecution(ctx context.Context, deleteRequest *historyservice.DeleteWorkflowExecutionRequest) error
68+
DeleteWorkflowExecution(ctx context.Context, deleteRequest *historyservice.DeleteWorkflowExecutionRequest) (*historyservice.DeleteWorkflowExecutionResponse, error)
6969
ResetWorkflowExecution(ctx context.Context, request *historyservice.ResetWorkflowExecutionRequest) (*historyservice.ResetWorkflowExecutionResponse, error)
7070
ScheduleWorkflowTask(ctx context.Context, request *historyservice.ScheduleWorkflowTaskRequest) error
7171
VerifyFirstWorkflowTaskScheduled(ctx context.Context, request *historyservice.VerifyFirstWorkflowTaskScheduledRequest) error

service/history/shard/engine_mock.go

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)