Skip to content

Commit 49f1797

Browse files
authored
Implement post reset operations (#7719)
## What changed? This PR adds PostResetOperation support to reset API. With this we will be performing certain operations after reset. In this PR - Added support for performing workflow options update. - Updated WorkflowResetter interface. - Added functional test ## Why? Developers/Operators often want to perform certain operations soon after resetting a workflow. Ex: updating worker version, sending signal to workflow etc. These operations need to be performed atomically along with the reset operation. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [x] added new functional test(s)
1 parent cd00f83 commit 49f1797

File tree

9 files changed

+114
-5
lines changed

9 files changed

+114
-5
lines changed

service/history/api/reapplyevents/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ func Invoke(
138138
toReapplyEvents,
139139
nil,
140140
false, // allowResetWithPendingChildren
141+
nil,
141142
)
142143
switch err.(type) {
143144
case *serviceerror.InvalidArgument:

service/history/api/resetworkflow/api.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,20 @@ package resetworkflow
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/google/uuid"
78
enumspb "go.temporal.io/api/enums/v1"
89
"go.temporal.io/api/serviceerror"
10+
workflowpb "go.temporal.io/api/workflow/v1"
911
"go.temporal.io/server/api/historyservice/v1"
1012
"go.temporal.io/server/common"
1113
"go.temporal.io/server/common/definition"
1214
"go.temporal.io/server/common/locks"
1315
"go.temporal.io/server/common/log/tag"
1416
"go.temporal.io/server/common/namespace"
1517
"go.temporal.io/server/common/persistence/versionhistory"
18+
"go.temporal.io/server/common/worker_versioning"
1619
"go.temporal.io/server/service/history/api"
1720
historyi "go.temporal.io/server/service/history/interfaces"
1821
"go.temporal.io/server/service/history/ndc"
@@ -30,6 +33,10 @@ func Invoke(
3033
return nil, err
3134
}
3235

36+
if err := validatePostResetOperationInputs(resetRequest.ResetRequest.PostResetOperations); err != nil {
37+
return nil, err
38+
}
39+
3340
request := resetRequest.ResetRequest
3441
workflowID := request.WorkflowExecution.GetWorkflowId()
3542
baseRunID := request.WorkflowExecution.GetRunId()
@@ -151,6 +158,7 @@ func Invoke(
151158
nil,
152159
GetResetReapplyExcludeTypes(request.GetResetReapplyExcludeTypes(), request.GetResetReapplyType()),
153160
allowResetWithPendingChildren,
161+
resetRequest.ResetRequest.PostResetOperations,
154162
); err != nil {
155163
return nil, err
156164
}
@@ -186,3 +194,19 @@ func GetResetReapplyExcludeTypes(
186194
}
187195
return exclude
188196
}
197+
198+
// validatePostResetOperationInputs validates the optional post reset operation inputs.
199+
func validatePostResetOperationInputs(postResetOperations []*workflowpb.PostResetOperation) error {
200+
for _, operation := range postResetOperations {
201+
switch op := operation.GetVariant().(type) {
202+
case *workflowpb.PostResetOperation_UpdateWorkflowOptions_:
203+
opts := op.UpdateWorkflowOptions.GetWorkflowExecutionOptions()
204+
if err := worker_versioning.ValidateVersioningOverride(opts.GetVersioningOverride()); err != nil {
205+
return err
206+
}
207+
default:
208+
return serviceerror.NewInvalidArgument(fmt.Sprintf("unsupported post reset operation: %T", op))
209+
}
210+
}
211+
return nil
212+
}

service/history/history_engine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5291,7 +5291,7 @@ func (s *engineSuite) TestReapplyEvents_ResetWorkflow() {
52915291
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
52925292
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
52935293
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
5294-
gomock.Any(),
5294+
gomock.Any(), gomock.Any(),
52955295
).Return(nil)
52965296

52975297
err = s.historyEngine.ReapplyEvents(

service/history/ndc/transaction_manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ func (r *transactionMgrImpl) backfillWorkflowEventsReapply(
326326
totalEvents,
327327
nil,
328328
false, // allowResetWithPendingChildren
329+
nil,
329330
)
330331
switch err.(type) {
331332
case *serviceerror.InvalidArgument:

service/history/ndc/transaction_manager_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Closed
226226
workflowEvents.Events,
227227
nil,
228228
false, // allowResetWithPendingChildren
229+
nil, // post reset operations
230+
229231
).Return(nil)
230232

231233
s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any(), &persistence.GetCurrentExecutionRequest{
@@ -307,6 +309,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Closed_ResetF
307309
workflowEvents.Events,
308310
nil,
309311
false, // allowResetWithPendingChildren
312+
nil, // post reset operations
310313
).Return(serviceerror.NewInvalidArgument("reset fail"))
311314

312315
s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any(), &persistence.GetCurrentExecutionRequest{

service/history/ndc/workflow_resetter.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
enumspb "go.temporal.io/api/enums/v1"
1313
historypb "go.temporal.io/api/history/v1"
1414
"go.temporal.io/api/serviceerror"
15+
workflowpb "go.temporal.io/api/workflow/v1"
1516
"go.temporal.io/api/workflowservice/v1"
1617
"go.temporal.io/server/api/historyservice/v1"
1718
persistencespb "go.temporal.io/server/api/persistence/v1"
@@ -26,6 +27,7 @@ import (
2627
"go.temporal.io/server/common/namespace"
2728
"go.temporal.io/server/common/persistence"
2829
"go.temporal.io/server/common/util"
30+
"go.temporal.io/server/service/history/api/updateworkflowoptions"
2931
"go.temporal.io/server/service/history/consts"
3032
"go.temporal.io/server/service/history/hsm"
3133
historyi "go.temporal.io/server/service/history/interfaces"
@@ -64,6 +66,7 @@ type (
6466
additionalReapplyEvents []*historypb.HistoryEvent,
6567
resetReapplyExcludeTypes map[enumspb.ResetReapplyExcludeType]struct{},
6668
allowResetWithPendingChildren bool,
69+
postResetOperations []*workflowpb.PostResetOperation,
6770
) error
6871
}
6972

@@ -119,6 +122,7 @@ func (r *workflowResetterImpl) ResetWorkflow(
119122
additionalReapplyEvents []*historypb.HistoryEvent,
120123
resetReapplyExcludeTypes map[enumspb.ResetReapplyExcludeType]struct{},
121124
allowResetWithPendingChildren bool,
125+
postResetOperations []*workflowpb.PostResetOperation,
122126
) (retError error) {
123127

124128
namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(namespaceID)
@@ -225,6 +229,10 @@ func (r *workflowResetterImpl) ResetWorkflow(
225229
return err
226230
}
227231

232+
if err := r.performPostResetOperations(ctx, resetMS, postResetOperations); err != nil {
233+
return err
234+
}
235+
228236
if err := workflow.ScheduleWorkflowTask(resetMS); err != nil {
229237
return err
230238
}
@@ -1136,3 +1144,17 @@ func (r *workflowResetterImpl) shouldExcludeAllReapplyEvents(excludeTypes map[en
11361144
}
11371145
return true
11381146
}
1147+
1148+
// performPostResetOperations performs the optional post reset operations on the reset workflow.
1149+
func (r *workflowResetterImpl) performPostResetOperations(ctx context.Context, resetMS historyi.MutableState, postResetOperations []*workflowpb.PostResetOperation) error {
1150+
for _, operation := range postResetOperations {
1151+
switch op := operation.GetVariant().(type) {
1152+
case *workflowpb.PostResetOperation_UpdateWorkflowOptions_:
1153+
_, _, err := updateworkflowoptions.MergeAndApply(resetMS, op.UpdateWorkflowOptions.GetWorkflowExecutionOptions(), op.UpdateWorkflowOptions.GetUpdateMask())
1154+
if err != nil {
1155+
return err
1156+
}
1157+
}
1158+
}
1159+
return nil
1160+
}

service/history/ndc/workflow_resetter_mock.go

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/transfer_queue_active_task_executor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,6 +1615,7 @@ func (t *transferQueueActiveTaskExecutor) resetWorkflow(
16151615
nil,
16161616
nil,
16171617
allowResetWithPendingChildren,
1618+
nil,
16181619
)
16191620

16201621
switch err.(type) {

tests/workflow_reset_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@ import (
1010
commandpb "go.temporal.io/api/command/v1"
1111
commonpb "go.temporal.io/api/common/v1"
1212
enumspb "go.temporal.io/api/enums/v1"
13+
historypb "go.temporal.io/api/history/v1"
1314
taskqueuepb "go.temporal.io/api/taskqueue/v1"
15+
workflowpb "go.temporal.io/api/workflow/v1"
1416
"go.temporal.io/api/workflowservice/v1"
1517
"go.temporal.io/sdk/client"
1618
"go.temporal.io/server/api/adminservice/v1"
1719
"go.temporal.io/server/common/payloads"
1820
"go.temporal.io/server/tests/testcore"
1921
"google.golang.org/protobuf/types/known/durationpb"
22+
"google.golang.org/protobuf/types/known/fieldmaskpb"
2023
)
2124

2225
// Tests workflow reset feature
@@ -193,6 +196,59 @@ func (s *WorkflowResetSuite) TestOriginalExecutionRunId() {
193196
}
194197
}
195198

199+
// Test that the workflow options are updated when the workflow is reset.
200+
func (s *WorkflowResetSuite) TestResetWorkflowWithOptionsUpdate() {
201+
workflowID := "test-reset" + uuid.NewString()
202+
ctx := testcore.NewContext()
203+
runs := s.setupRuns(ctx, workflowID, 1, true)
204+
currentRunID := runs[0]
205+
206+
// Reset the workflow by providing the explicit runID (base run) to reset.
207+
resp, err := s.FrontendClient().ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{
208+
Namespace: s.Namespace().String(),
209+
WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID, RunId: currentRunID},
210+
Reason: "testing-reset",
211+
RequestId: uuid.NewString(),
212+
WorkflowTaskFinishEventId: s.getFirstWFTaskCompleteEventID(ctx, workflowID, currentRunID),
213+
PostResetOperations: []*workflowpb.PostResetOperation{
214+
{
215+
Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{
216+
UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{
217+
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
218+
VersioningOverride: &workflowpb.VersioningOverride{
219+
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
220+
PinnedVersion: "testing.v.123",
221+
},
222+
},
223+
UpdateMask: &fieldmaskpb.FieldMask{
224+
Paths: []string{
225+
"versioning_override",
226+
},
227+
},
228+
},
229+
},
230+
},
231+
},
232+
})
233+
s.NoError(err)
234+
newRunID := resp.RunId
235+
236+
// assert that the new run has the updated workflow options
237+
var optionsUpdatedEvent *historypb.HistoryEvent
238+
hist := s.SdkClient().GetWorkflowHistory(ctx, workflowID, newRunID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
239+
for hist.HasNext() {
240+
event, err := hist.Next()
241+
s.NoError(err)
242+
if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED {
243+
optionsUpdatedEvent = event
244+
break
245+
}
246+
}
247+
s.NotNil(optionsUpdatedEvent)
248+
s.Equal(optionsUpdatedEvent.GetWorkflowExecutionOptionsUpdatedEventAttributes().GetVersioningOverride().GetBehavior(), enumspb.VERSIONING_BEHAVIOR_PINNED)
249+
s.Equal(optionsUpdatedEvent.GetWorkflowExecutionOptionsUpdatedEventAttributes().GetVersioningOverride().GetPinnedVersion(), "testing.v.123")
250+
}
251+
196252
// Helper methods
197253

198254
// getFirstWFTaskCompleteEventID finds the first event corresponding to workflow task completion. This can be used as a good reset point for tests in this suite.

0 commit comments

Comments
 (0)