Skip to content

Commit 1bb6ad1

Browse files
committed
Overhaul workflow verification
1 parent bdbb7f6 commit 1bb6ad1

14 files changed

+465
-192
lines changed

cmd/cli/run_scenario.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cli
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"strings"
@@ -57,6 +58,7 @@ type scenarioRunConfig struct {
5758
timeout time.Duration
5859
doNotRegisterSearchAttributes bool
5960
ignoreAlreadyStarted bool
61+
continueOnError bool
6062
}
6163

6264
func (r *scenarioRunner) addCLIFlags(fs *pflag.FlagSet) {
@@ -84,6 +86,8 @@ func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) {
8486
"If the search attributes are not registed by the scenario they must be registered through some other method")
8587
fs.BoolVar(&r.ignoreAlreadyStarted, "ignore-already-started", false,
8688
"Ignore if a workflow with the same ID already exists. A Scenario may choose to override this behavior.")
89+
fs.BoolVar(&r.continueOnError, "continue-on-error", false,
90+
"Continue running iterations even when an iteration fails after all retries are exhausted")
8791
}
8892

8993
func (r *scenarioRunner) preRun() {
@@ -160,15 +164,33 @@ func (r *scenarioRunner) run(ctx context.Context) error {
160164
Timeout: r.timeout,
161165
DoNotRegisterSearchAttributes: r.doNotRegisterSearchAttributes,
162166
IgnoreAlreadyStarted: r.ignoreAlreadyStarted,
167+
ContinueOnError: r.continueOnError,
163168
},
164169
ScenarioOptions: scenarioOptions,
165170
Namespace: r.clientOptions.Namespace,
166171
RootPath: repoDir,
167172
}
168173
executor := scenario.ExecutorFn()
169-
err = executor.Run(ctx, scenarioInfo)
170-
if err != nil {
171-
return fmt.Errorf("failed scenario: %w", err)
174+
175+
// Phase 1: Run the scenario
176+
r.logger.Info("Running scenario")
177+
scenarioErr := executor.Run(ctx, scenarioInfo)
178+
179+
// Collect all errors
180+
var allErrors []error
181+
if scenarioErr != nil {
182+
allErrors = append(allErrors, fmt.Errorf("scenario execution: %w", scenarioErr))
172183
}
173-
return nil
184+
185+
// Phase 2: Run verifications
186+
if verifiable, ok := executor.(loadgen.Verifyable); ok {
187+
r.logger.Info("Running verifications")
188+
verifyErrs := verifiable.VerifyRun(ctx, scenarioInfo)
189+
for _, err := range verifyErrs {
190+
allErrors = append(allErrors, fmt.Errorf("verification: %w", err))
191+
}
192+
}
193+
194+
// Aggregate all errors
195+
return errors.Join(allErrors...)
174196
}

loadgen/generic_executor.go

Lines changed: 109 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,27 @@ package loadgen
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"sync"
68
"time"
79

10+
"go.temporal.io/api/serviceerror"
811
"go.temporal.io/sdk/client"
912
"go.uber.org/zap"
1013
)
1114

1215
type GenericExecutor struct {
1316
// Function to execute a single iteration of this scenario
1417
Execute func(context.Context, *Run) error
18+
19+
// WorkflowCompletionChecker is optional - when set, enables verification of workflow completions.
20+
// When nil (default), verification is disabled.
21+
WorkflowCompletionChecker *WorkflowCompletionChecker
22+
23+
// State management
24+
mu sync.Mutex
25+
state *ExecutorState
1526
}
1627

1728
type genericRun struct {
@@ -24,13 +35,84 @@ type genericRun struct {
2435
}
2536

2637
func (g *GenericExecutor) Run(ctx context.Context, info ScenarioInfo) error {
38+
g.mu.Lock()
39+
if g.state == nil {
40+
g.state = &ExecutorState{}
41+
}
42+
if g.state.StartedAt.IsZero() {
43+
g.state.StartedAt = time.Now()
44+
}
45+
g.mu.Unlock()
46+
2747
r, err := g.newRun(info)
2848
if err != nil {
2949
return err
3050
}
3151
return r.Run(ctx)
3252
}
3353

54+
func (g *GenericExecutor) RecordCompletion() {
55+
g.mu.Lock()
56+
defer g.mu.Unlock()
57+
58+
if g.state != nil {
59+
g.state.CompletedIterations += 1
60+
g.state.LastCompletedAt = time.Now()
61+
}
62+
}
63+
64+
func (g *GenericExecutor) RecordError(err error) {
65+
g.mu.Lock()
66+
defer g.mu.Unlock()
67+
68+
if g.state != nil && err != nil {
69+
g.state.IterationErrors = append(g.state.IterationErrors, err.Error())
70+
}
71+
}
72+
73+
func (g *GenericExecutor) VerifyRun(ctx context.Context, info ScenarioInfo) []error {
74+
g.mu.Lock()
75+
state := g.state
76+
checker := g.WorkflowCompletionChecker
77+
g.mu.Unlock()
78+
79+
if checker == nil {
80+
return nil
81+
}
82+
if err := checker.Verify(ctx, info, state); err != nil {
83+
return []error{err}
84+
}
85+
return nil
86+
}
87+
88+
// GetState returns a copy of the current state
89+
func (g *GenericExecutor) GetState() ExecutorState {
90+
g.mu.Lock()
91+
defer g.mu.Unlock()
92+
93+
if g.state == nil {
94+
return ExecutorState{}
95+
}
96+
return *g.state
97+
}
98+
99+
func (g *GenericExecutor) Snapshot() any {
100+
return g.GetState()
101+
}
102+
103+
func (g *GenericExecutor) LoadState(loader func(any) error) error {
104+
var state ExecutorState
105+
if err := loader(&state); err != nil {
106+
return err
107+
}
108+
109+
g.mu.Lock()
110+
g.state = &state
111+
g.mu.Unlock()
112+
113+
return nil
114+
}
115+
34116
func (g *GenericExecutor) newRun(info ScenarioInfo) (*genericRun, error) {
35117
info.Configuration.ApplyDefaults()
36118
if err := info.Configuration.Validate(); err != nil {
@@ -83,7 +165,12 @@ func (g *genericRun) Run(ctx context.Context) error {
83165
case err := <-doneCh:
84166
currentlyRunning--
85167
if err != nil {
86-
runErr = err
168+
if g.config.ContinueOnError {
169+
g.logger.Warnf("Iteration failed but continuing due to --continue-on-error: %v", err)
170+
g.executor.RecordError(err)
171+
} else {
172+
runErr = err
173+
}
87174
}
88175
case <-contextToWaitOn.Done():
89176
}
@@ -125,6 +212,7 @@ func (g *genericRun) Run(ctx context.Context) error {
125212
run := g.info.NewRun(i + 1)
126213
go func() {
127214
var err error
215+
var shouldRecordCompletion bool
128216
iterStart := time.Now()
129217

130218
defer func() {
@@ -133,8 +221,11 @@ func (g *genericRun) Run(ctx context.Context) error {
133221
select {
134222
case <-ctx.Done():
135223
case doneCh <- err:
136-
if err == nil && g.config.OnCompletion != nil {
137-
g.config.OnCompletion(ctx, run)
224+
if err == nil && shouldRecordCompletion {
225+
g.executor.RecordCompletion()
226+
if g.config.OnCompletion != nil {
227+
g.config.OnCompletion(ctx, run)
228+
}
138229
}
139230
}
140231
}()
@@ -145,7 +236,22 @@ func (g *genericRun) Run(ctx context.Context) error {
145236
if err != nil && g.config.HandleExecuteError != nil {
146237
err = g.config.HandleExecuteError(ctx, run, err)
147238
}
239+
240+
// Check if workflow was already started
241+
if err != nil {
242+
var alreadyStartedErr *serviceerror.WorkflowExecutionAlreadyStarted
243+
if errors.As(err, &alreadyStartedErr) {
244+
if g.config.IgnoreAlreadyStarted {
245+
g.logger.Debugf("Workflow already started, skipping iteration %v", run.Iteration)
246+
err = nil
247+
shouldRecordCompletion = false
248+
break
249+
}
250+
}
251+
}
252+
148253
if err == nil {
254+
shouldRecordCompletion = true
149255
break
150256
}
151257

loadgen/generic_executor_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,3 +258,34 @@ func TestExecutorRetriesLimit(t *testing.T) {
258258
require.Equal(t, []int{1, 1, 1, 1, 1}, totalTracker.seen, "expected 5 attempts")
259259
})
260260
}
261+
262+
func TestExecutorContinuesOnError(t *testing.T) {
263+
synctest.Test(t, func(t *testing.T) {
264+
tracker := newIterationTracker()
265+
executor := &GenericExecutor{
266+
Execute: func(ctx context.Context, run *Run) error {
267+
tracker.track(run.Iteration)
268+
if run.Iteration == 2 || run.Iteration == 4 {
269+
return errors.New("deliberate failure")
270+
}
271+
return nil
272+
},
273+
}
274+
275+
err := execute(executor,
276+
RunConfiguration{
277+
Iterations: 5,
278+
ContinueOnError: true,
279+
},
280+
)
281+
282+
require.NoError(t, err, "executor should complete when ContinueOnError is true")
283+
tracker.assertSeen(t, 5)
284+
285+
state := executor.GetState()
286+
require.Equal(t, 3, state.CompletedIterations)
287+
require.Len(t, state.IterationErrors, 2)
288+
require.Contains(t, state.IterationErrors[0], "deliberate failure")
289+
require.Contains(t, state.IterationErrors[1], "deliberate failure")
290+
})
291+
}

loadgen/kitchen_sink_executor.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
)
99

1010
type KitchenSinkExecutor struct {
11+
GenericExecutor
12+
1113
TestInput *kitchensink.TestInput
1214

1315
// Called once on start
@@ -18,29 +20,28 @@ type KitchenSinkExecutor struct {
1820
UpdateWorkflowOptions func(context.Context, *Run, *KitchenSinkWorkflowOptions) error
1921
}
2022

21-
func (k KitchenSinkExecutor) Run(ctx context.Context, info ScenarioInfo) error {
23+
func (k *KitchenSinkExecutor) Run(ctx context.Context, info ScenarioInfo) error {
2224
if k.PrepareTestInput != nil {
2325
if err := k.PrepareTestInput(ctx, info, k.TestInput); err != nil {
2426
return err
2527
}
2628
}
27-
// Create generic executor and run it
28-
ge := &GenericExecutor{
29-
Execute: func(ctx context.Context, run *Run) error {
30-
options := run.DefaultKitchenSinkWorkflowOptions()
31-
testInputClone, ok := proto.Clone(k.TestInput).(*kitchensink.TestInput)
32-
if !ok {
33-
panic("failed to clone test input")
34-
}
35-
options.Params = testInputClone
36-
if k.UpdateWorkflowOptions != nil {
37-
err := k.UpdateWorkflowOptions(ctx, run, &options)
38-
if err != nil {
39-
return err
40-
}
29+
30+
k.GenericExecutor.Execute = func(ctx context.Context, run *Run) error {
31+
options := run.DefaultKitchenSinkWorkflowOptions()
32+
testInputClone, ok := proto.Clone(k.TestInput).(*kitchensink.TestInput)
33+
if !ok {
34+
panic("failed to clone test input")
35+
}
36+
options.Params = testInputClone
37+
if k.UpdateWorkflowOptions != nil {
38+
err := k.UpdateWorkflowOptions(ctx, run, &options)
39+
if err != nil {
40+
return err
4141
}
42-
return run.ExecuteKitchenSinkWorkflow(ctx, &options)
43-
},
42+
}
43+
return run.ExecuteKitchenSinkWorkflow(ctx, &options)
4444
}
45-
return ge.Run(ctx, info)
45+
46+
return k.GenericExecutor.Run(ctx, info)
4647
}

loadgen/scenario.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ type Executor interface {
3030
Run(context.Context, ScenarioInfo) error
3131
}
3232

33+
type ExecutorState struct {
34+
// StartedAt is the timestamp when the executor run started.
35+
StartedAt time.Time `json:"startedAt"`
36+
// CompletedIterations tracks the number of successfully completed iterations.
37+
CompletedIterations int `json:"completedIterations"`
38+
// LastCompletedAt is the timestamp of the last completed workflow.
39+
LastCompletedAt time.Time `json:"lastCompletedAt"`
40+
// IterationErrors tracks errors encountered during iterations (for debugging/resumption)
41+
IterationErrors []string `json:"iterationErrors,omitempty"`
42+
}
43+
3344
// Optional interface that can be implemented by an [Executor] to allow it to be resumable.
3445
type Resumable interface {
3546
// LoadState loads a snapshot into the executor's internal state.
@@ -53,6 +64,12 @@ type Configurable interface {
5364
Configure(ScenarioInfo) error
5465
}
5566

67+
// Verifyable is an optional interface that executors can implement to perform verifications after Run() completes.
68+
type Verifyable interface {
69+
// VerifyRun performs post-execution verifications and returns a list of errors.
70+
VerifyRun(context.Context, ScenarioInfo) []error
71+
}
72+
5673
// ExecutorFunc is an [Executor] implementation for a function
5774
type ExecutorFunc func(context.Context, ScenarioInfo) error
5875

@@ -204,6 +221,9 @@ type RunConfiguration struct {
204221
// IgnoreAlreadyStarted, if set, will not error when a workflow with the same ID already exists.
205222
// Default is false.
206223
IgnoreAlreadyStarted bool
224+
// ContinueOnError, if set, will continue running iterations even after an iteration fails
225+
// (after all retries are exhausted). Default is false.
226+
ContinueOnError bool
207227
// OnCompletion, if set, is invoked after each successful iteration completes.
208228
OnCompletion func(context.Context, *Run)
209229
// HandleExecuteError, if set, is called when Execute returns an error, allowing transformation of errors.

0 commit comments

Comments
 (0)