Skip to content

Commit d252656

Browse files
authored
Move remove signal from mutable state to api package (#3475)
1 parent 740c7a3 commit d252656

File tree

7 files changed

+94
-44
lines changed

7 files changed

+94
-44
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package removesignalmutablestate
26+
27+
import (
28+
"context"
29+
30+
"go.temporal.io/server/api/historyservice/v1"
31+
"go.temporal.io/server/common/definition"
32+
"go.temporal.io/server/common/namespace"
33+
"go.temporal.io/server/service/history/api"
34+
"go.temporal.io/server/service/history/consts"
35+
"go.temporal.io/server/service/history/shard"
36+
)
37+
38+
func Invoke(
39+
ctx context.Context,
40+
req *historyservice.RemoveSignalMutableStateRequest,
41+
shard shard.Context,
42+
workflowConsistencyChecker api.WorkflowConsistencyChecker,
43+
) (resp *historyservice.RemoveSignalMutableStateResponse, retError error) {
44+
_, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
err = api.GetAndUpdateWorkflowWithNew(
50+
ctx,
51+
nil,
52+
api.BypassMutableStateConsistencyPredicate,
53+
definition.NewWorkflowKey(
54+
req.NamespaceId,
55+
req.WorkflowExecution.WorkflowId,
56+
req.WorkflowExecution.RunId,
57+
),
58+
func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) {
59+
mutableState := workflowContext.GetMutableState()
60+
if !mutableState.IsWorkflowExecutionRunning() {
61+
return nil, consts.ErrWorkflowCompleted
62+
}
63+
64+
mutableState.DeleteSignalRequested(req.GetRequestId())
65+
return &api.UpdateWorkflowAction{
66+
Noop: false,
67+
CreateWorkflowTask: false,
68+
}, nil
69+
},
70+
nil,
71+
shard,
72+
workflowConsistencyChecker,
73+
)
74+
if err != nil {
75+
return nil, err
76+
}
77+
return &historyservice.RemoveSignalMutableStateResponse{}, nil
78+
}

service/history/api/respondactivitytaskcanceled/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
2323
// THE SOFTWARE.
2424

25-
package respondactivitytaskcandeled
25+
package respondactivitytaskcanceled
2626

2727
import (
2828
"context"

service/history/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -957,12 +957,12 @@ func (h *Handler) RemoveSignalMutableState(ctx context.Context, request *history
957957
return nil, h.convertError(err)
958958
}
959959

960-
err2 := engine.RemoveSignalMutableState(ctx, request)
960+
resp, err2 := engine.RemoveSignalMutableState(ctx, request)
961961
if err2 != nil {
962962
return nil, h.convertError(err2)
963963
}
964964

965-
return &historyservice.RemoveSignalMutableStateResponse{}, nil
965+
return resp, nil
966966
}
967967

968968
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event

service/history/historyEngine.go

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,13 @@ import (
6060
"go.temporal.io/server/service/history/api/recordactivitytaskheartbeat"
6161
"go.temporal.io/server/service/history/api/recordactivitytaskstarted"
6262
"go.temporal.io/server/service/history/api/recordchildworkflowcompleted"
63+
"go.temporal.io/server/service/history/api/removesignalmutablestate"
6364
replicationapi "go.temporal.io/server/service/history/api/replication"
6465
"go.temporal.io/server/service/history/api/replicationadmin"
6566
"go.temporal.io/server/service/history/api/requestcancelworkflow"
6667
"go.temporal.io/server/service/history/api/resetstickytaskqueue"
6768
"go.temporal.io/server/service/history/api/resetworkflow"
68-
respondactivitytaskcandeled "go.temporal.io/server/service/history/api/respondactivitytaskcanceled"
69+
"go.temporal.io/server/service/history/api/respondactivitytaskcanceled"
6970
"go.temporal.io/server/service/history/api/respondactivitytaskcompleted"
7071
"go.temporal.io/server/service/history/api/respondactivitytaskfailed"
7172
"go.temporal.io/server/service/history/api/signalwithstartworkflow"
@@ -821,7 +822,7 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(
821822
ctx context.Context,
822823
req *historyservice.RespondActivityTaskCanceledRequest,
823824
) (*historyservice.RespondActivityTaskCanceledResponse, error) {
824-
return respondactivitytaskcandeled.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker)
825+
return respondactivitytaskcanceled.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker)
825826
}
826827

827828
// RecordActivityTaskHeartbeat records an hearbeat for a task.
@@ -869,39 +870,9 @@ func (h *historyEngineImpl) UpdateWorkflow(
869870
// RemoveSignalMutableState remove the signal request id in signal_requested for deduplicate
870871
func (e *historyEngineImpl) RemoveSignalMutableState(
871872
ctx context.Context,
872-
request *historyservice.RemoveSignalMutableStateRequest,
873-
) error {
874-
875-
_, err := e.getActiveNamespaceEntry(namespace.ID(request.GetNamespaceId()))
876-
if err != nil {
877-
return err
878-
}
879-
880-
return api.GetAndUpdateWorkflowWithNew(
881-
ctx,
882-
nil,
883-
api.BypassMutableStateConsistencyPredicate,
884-
definition.NewWorkflowKey(
885-
request.NamespaceId,
886-
request.WorkflowExecution.WorkflowId,
887-
request.WorkflowExecution.RunId,
888-
),
889-
func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) {
890-
mutableState := workflowContext.GetMutableState()
891-
if !mutableState.IsWorkflowExecutionRunning() {
892-
return nil, consts.ErrWorkflowCompleted
893-
}
894-
895-
mutableState.DeleteSignalRequested(request.GetRequestId())
896-
return &api.UpdateWorkflowAction{
897-
Noop: false,
898-
CreateWorkflowTask: false,
899-
}, nil
900-
},
901-
nil,
902-
e.shard,
903-
e.workflowConsistencyChecker,
904-
)
873+
req *historyservice.RemoveSignalMutableStateRequest,
874+
) (*historyservice.RemoveSignalMutableStateResponse, error) {
875+
return removesignalmutablestate.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker)
905876
}
906877

907878
func (e *historyEngineImpl) TerminateWorkflowExecution(

service/history/historyEngine_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4893,7 +4893,7 @@ func (s *engineSuite) TestSignalWorkflowExecution_WorkflowTaskBackoff() {
48934893

48944894
func (s *engineSuite) TestRemoveSignalMutableState() {
48954895
removeRequest := &historyservice.RemoveSignalMutableStateRequest{}
4896-
err := s.mockHistoryEngine.RemoveSignalMutableState(context.Background(), removeRequest)
4896+
_, err := s.mockHistoryEngine.RemoveSignalMutableState(context.Background(), removeRequest)
48974897
s.EqualError(err, "Missing namespace UUID.")
48984898

48994899
execution := commonpb.WorkflowExecution{
@@ -4920,7 +4920,7 @@ func (s *engineSuite) TestRemoveSignalMutableState() {
49204920
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)
49214921
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)
49224922

4923-
err = s.mockHistoryEngine.RemoveSignalMutableState(context.Background(), removeRequest)
4923+
_, err = s.mockHistoryEngine.RemoveSignalMutableState(context.Background(), removeRequest)
49244924
s.Nil(err)
49254925
}
49264926

service/history/shard/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type (
6363
RequestCancelWorkflowExecution(ctx context.Context, request *historyservice.RequestCancelWorkflowExecutionRequest) (*historyservice.RequestCancelWorkflowExecutionResponse, error)
6464
SignalWorkflowExecution(ctx context.Context, request *historyservice.SignalWorkflowExecutionRequest) (*historyservice.SignalWorkflowExecutionResponse, error)
6565
SignalWithStartWorkflowExecution(ctx context.Context, request *historyservice.SignalWithStartWorkflowExecutionRequest) (*historyservice.SignalWithStartWorkflowExecutionResponse, error)
66-
RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) error
66+
RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) (*historyservice.RemoveSignalMutableStateResponse, error)
6767
TerminateWorkflowExecution(ctx context.Context, request *historyservice.TerminateWorkflowExecutionRequest) (*historyservice.TerminateWorkflowExecutionResponse, error)
6868
DeleteWorkflowExecution(ctx context.Context, deleteRequest *historyservice.DeleteWorkflowExecutionRequest) error
6969
ResetWorkflowExecution(ctx context.Context, request *historyservice.ResetWorkflowExecutionRequest) (*historyservice.ResetWorkflowExecutionResponse, error)

service/history/shard/engine_mock.go

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)