Skip to content

Commit f664025

Browse files
authored
Add option to ignore already started error (#227)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed <!-- Describe what has changed in this PR --> WISOTT ## Why? <!-- Tell your future self why have you made these changes --> When running a scenario it can happen that transient failures lead to an incomplete scenario run and on retry the scenario will then fail because workflows were already created. It's not always possible here to use a new/clean task queue; in those cases, this new flag allows to skip over these errors. ## Checklist <!--- add/delete as needed ---> 1. Closes <!-- add issue number here --> 2. How was this tested: <!--- Please describe how you tested your changes/how we can test them --> 3. Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io -->
1 parent 17ecf2d commit f664025

File tree

3 files changed

+22
-29
lines changed

3 files changed

+22
-29
lines changed

cmd/cli/run_scenario.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type scenarioRunConfig struct {
5656
scenarioOptions []string
5757
timeout time.Duration
5858
doNotRegisterSearchAttributes bool
59+
ignoreAlreadyStarted bool
5960
}
6061

6162
func (r *scenarioRunner) addCLIFlags(fs *pflag.FlagSet) {
@@ -81,6 +82,8 @@ func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) {
8182
fs.BoolVar(&r.doNotRegisterSearchAttributes, "do-not-register-search-attributes", false,
8283
"Do not register the default search attributes used by scenarios. "+
8384
"If the search attributes are not registed by the scenario they must be registered through some other method")
85+
fs.BoolVar(&r.ignoreAlreadyStarted, "ignore-already-started", false,
86+
"Ignore if a workflow with the same ID already exists. A Scenario may choose to override this behavior.")
8487
}
8588

8689
func (r *scenarioRunner) preRun() {
@@ -156,6 +159,7 @@ func (r *scenarioRunner) run(ctx context.Context) error {
156159
MaxIterationAttempts: r.maxIterationAttempts,
157160
Timeout: r.timeout,
158161
DoNotRegisterSearchAttributes: r.doNotRegisterSearchAttributes,
162+
IgnoreAlreadyStarted: r.ignoreAlreadyStarted,
159163
},
160164
ScenarioOptions: scenarioOptions,
161165
Namespace: r.clientOptions.Namespace,

loadgen/scenario.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ type RunConfiguration struct {
201201
// cannot use the SDK to register SAs, instead the SAs must be registered through the control plane.
202202
// Default is false.
203203
DoNotRegisterSearchAttributes bool
204+
// IgnoreAlreadyStarted, if set, will not error when a workflow with the same ID already exists.
205+
// Default is false.
206+
IgnoreAlreadyStarted bool
204207
// OnCompletion, if set, is invoked after each successful iteration completes.
205208
OnCompletion func(context.Context, *Run)
206209
// HandleExecuteError, if set, is called when Execute returns an error, allowing transformation of errors.
@@ -305,7 +308,7 @@ func (r *Run) DefaultStartWorkflowOptions() client.StartWorkflowOptions {
305308
return client.StartWorkflowOptions{
306309
TaskQueue: TaskQueueForRun(r.RunID),
307310
ID: fmt.Sprintf("w-%s-%d", r.RunID, r.Iteration),
308-
WorkflowExecutionErrorWhenAlreadyStarted: true,
311+
WorkflowExecutionErrorWhenAlreadyStarted: !r.Configuration.IgnoreAlreadyStarted,
309312
}
310313
}
311314

scenarios/throughput_stress.go

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/temporalio/omes/loadgen"
1515
. "github.com/temporalio/omes/loadgen/kitchensink"
1616
"go.temporal.io/api/common/v1"
17-
"go.temporal.io/api/serviceerror"
1817
"go.temporal.io/api/workflowservice/v1"
1918
"go.temporal.io/sdk/temporal"
2019
"google.golang.org/protobuf/types/known/emptypb"
@@ -201,19 +200,6 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
201200
info.Logger.Debugf("Completed iteration %d", run.Iteration)
202201
}
203202

204-
// When resuming, it can happen that the workflow for the current iteration already exists since the snapshot
205-
// was not up-to-date. In that case, we just skip this iteration and move on.
206-
info.Configuration.HandleExecuteError = func(ctx context.Context, run *loadgen.Run, err error) error {
207-
if isResuming {
208-
var alreadyStartedErr *serviceerror.WorkflowExecutionAlreadyStarted
209-
if errors.As(err, &alreadyStartedErr) {
210-
info.Logger.Warnf("after resume, workflow for iteration %d already exists", run.Iteration)
211-
return nil
212-
}
213-
}
214-
return err
215-
}
216-
217203
// Start the scenario run.
218204
//
219205
// NOTE: When resuming, it can happen that there are no more iterations/time left to run more iterations.
@@ -228,7 +214,11 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
228214
},
229215
},
230216
UpdateWorkflowOptions: func(ctx context.Context, run *loadgen.Run, options *loadgen.KitchenSinkWorkflowOptions) error {
231-
options.StartOptions.ID = workflowID(run.RunID, run.Iteration)
217+
options.StartOptions = run.DefaultStartWorkflowOptions()
218+
if isResuming {
219+
// Enforce to never fail on "workflow already started" when resuming.
220+
options.StartOptions.WorkflowExecutionErrorWhenAlreadyStarted = false
221+
}
232222

233223
// Add search attribute to the workflow options so that it can be used in visibility queries.
234224
options.StartOptions.TypedSearchAttributes = temporal.NewSearchAttributes(
@@ -255,7 +245,7 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
255245
//
256246
// NOTE: No client actions (e.g. Signal) are defined; however, client action activities are.
257247
// That means these client actions are sent from the activity worker instead of Omes.
258-
options.Params.WorkflowInput.InitialActions = t.createActions(run.Iteration)
248+
options.Params.WorkflowInput.InitialActions = t.createActions(run)
259249

260250
return nil
261251
},
@@ -343,17 +333,17 @@ func (t *tpsExecutor) updateStateOnIterationCompletion() {
343333
t.state.LastCompletedIterationAt = time.Now()
344334
}
345335

346-
func (t *tpsExecutor) createActions(iteration int) []*ActionSet {
336+
func (t *tpsExecutor) createActions(run *loadgen.Run) []*ActionSet {
347337
return []*ActionSet{
348338
{
349-
Actions: t.createActionsChunk(iteration, 0, 0, t.config.InternalIterations),
339+
Actions: t.createActionsChunk(run, 0, 0, t.config.InternalIterations),
350340
Concurrent: false,
351341
},
352342
}
353343
}
354344

355345
func (t *tpsExecutor) createActionsChunk(
356-
iteration int,
346+
run *loadgen.Run,
357347
childCount int,
358348
continueAsNewCounter int,
359349
remainingInternalIters int,
@@ -367,7 +357,7 @@ func (t *tpsExecutor) createActionsChunk(
367357
isLastChunk := remainingInternalIters <= itersPerChunk
368358
itersPerChunk = min(itersPerChunk, remainingInternalIters) // cap chunk size to remaining iterations
369359

370-
rng := rand.New(rand.NewSource(t.config.RngSeed + int64(iteration)))
360+
rng := rand.New(rand.NewSource(t.config.RngSeed + int64(run.Iteration)))
371361

372362
// Create actions for the current chunk
373363
for i := 0; i < itersPerChunk; i++ {
@@ -381,7 +371,7 @@ func (t *tpsExecutor) createActionsChunk(
381371

382372
childCount++
383373
asyncActions := []*Action{
384-
t.createChildWorkflowAction(iteration, childCount),
374+
t.createChildWorkflowAction(run, childCount),
385375
PayloadActivity(256, 256, DefaultRemoteActivity),
386376
PayloadActivity(256, 256, DefaultRemoteActivity),
387377
PayloadActivity(0, 256, DefaultLocalActivity),
@@ -445,7 +435,7 @@ func (t *tpsExecutor) createActionsChunk(
445435
InitialActions: []*ActionSet{
446436
{
447437
Actions: t.createActionsChunk(
448-
iteration,
438+
run,
449439
childCount,
450440
continueAsNewCounter+1,
451441
remainingInternalIters-itersPerChunk),
@@ -462,7 +452,7 @@ func (t *tpsExecutor) createActionsChunk(
462452
return chunkActions
463453
}
464454

465-
func (t *tpsExecutor) createChildWorkflowAction(iteration int, childID int) *Action {
455+
func (t *tpsExecutor) createChildWorkflowAction(run *loadgen.Run, childID int) *Action {
466456
return &Action{
467457
Variant: &Action_ExecChildWorkflow{
468458
ExecChildWorkflow: &ExecuteChildWorkflowAction{
@@ -481,7 +471,7 @@ func (t *tpsExecutor) createChildWorkflowAction(iteration int, childID int) *Act
481471
},
482472
}),
483473
},
484-
WorkflowId: fmt.Sprintf("%s/child-%d", workflowID(t.runID, iteration), childID),
474+
WorkflowId: fmt.Sprintf("%s/child-%d", run.DefaultStartWorkflowOptions().ID, childID),
485475
SearchAttributes: map[string]*common.Payload{
486476
ThroughputStressScenarioIdSearchAttribute: &common.Payload{
487477
Metadata: map[string][]byte{"encoding": []byte("json/plain")},
@@ -623,10 +613,6 @@ func (t *tpsExecutor) createNexusWaitForCancelAction() *Action {
623613
}
624614
}
625615

626-
func workflowID(runID string, iteration int) string {
627-
return fmt.Sprintf("throughputStress/%s/iter-%d", runID, iteration)
628-
}
629-
630616
func (t *tpsExecutor) maybeWithStart(likelihood float64) bool {
631617
t.lock.Lock()
632618
defer t.lock.Unlock()

0 commit comments

Comments
 (0)