@@ -2,16 +2,27 @@ package loadgen
22
33import (
44 "context"
5+ "errors"
56 "fmt"
7+ "sync"
68 "time"
79
10+ "go.temporal.io/api/serviceerror"
811 "go.temporal.io/sdk/client"
912 "go.uber.org/zap"
1013)
1114
1215type GenericExecutor struct {
1316 // Function to execute a single iteration of this scenario
1417 Execute func (context.Context , * Run ) error
18+
19+ // WorkflowCompletionChecker is optional - when set, enables verification of workflow completions.
20+ // When nil (default), verification is disabled.
21+ WorkflowCompletionChecker * WorkflowCompletionChecker
22+
23+ // State management
24+ mu sync.Mutex
25+ state * ExecutorState
1526}
1627
1728type genericRun struct {
@@ -24,13 +35,84 @@ type genericRun struct {
2435}
2536
2637func (g * GenericExecutor ) Run (ctx context.Context , info ScenarioInfo ) error {
38+ g .mu .Lock ()
39+ if g .state == nil {
40+ g .state = & ExecutorState {}
41+ }
42+ if g .state .StartedAt .IsZero () {
43+ g .state .StartedAt = time .Now ()
44+ }
45+ g .mu .Unlock ()
46+
2747 r , err := g .newRun (info )
2848 if err != nil {
2949 return err
3050 }
3151 return r .Run (ctx )
3252}
3353
54+ func (g * GenericExecutor ) RecordCompletion () {
55+ g .mu .Lock ()
56+ defer g .mu .Unlock ()
57+
58+ if g .state != nil {
59+ g .state .CompletedIterations += 1
60+ g .state .LastCompletedAt = time .Now ()
61+ }
62+ }
63+
64+ func (g * GenericExecutor ) RecordError (err error ) {
65+ g .mu .Lock ()
66+ defer g .mu .Unlock ()
67+
68+ if g .state != nil && err != nil {
69+ g .state .IterationErrors = append (g .state .IterationErrors , err .Error ())
70+ }
71+ }
72+
73+ func (g * GenericExecutor ) VerifyRun (ctx context.Context , info ScenarioInfo ) []error {
74+ g .mu .Lock ()
75+ state := g .state
76+ checker := g .WorkflowCompletionChecker
77+ g .mu .Unlock ()
78+
79+ if checker == nil {
80+ return nil
81+ }
82+ if err := checker .Verify (ctx , info , state ); err != nil {
83+ return []error {err }
84+ }
85+ return nil
86+ }
87+
88+ // GetState returns a copy of the current state
89+ func (g * GenericExecutor ) GetState () ExecutorState {
90+ g .mu .Lock ()
91+ defer g .mu .Unlock ()
92+
93+ if g .state == nil {
94+ return ExecutorState {}
95+ }
96+ return * g .state
97+ }
98+
99+ func (g * GenericExecutor ) Snapshot () any {
100+ return g .GetState ()
101+ }
102+
103+ func (g * GenericExecutor ) LoadState (loader func (any ) error ) error {
104+ var state ExecutorState
105+ if err := loader (& state ); err != nil {
106+ return err
107+ }
108+
109+ g .mu .Lock ()
110+ g .state = & state
111+ g .mu .Unlock ()
112+
113+ return nil
114+ }
115+
34116func (g * GenericExecutor ) newRun (info ScenarioInfo ) (* genericRun , error ) {
35117 info .Configuration .ApplyDefaults ()
36118 if err := info .Configuration .Validate (); err != nil {
@@ -83,7 +165,12 @@ func (g *genericRun) Run(ctx context.Context) error {
83165 case err := <- doneCh :
84166 currentlyRunning --
85167 if err != nil {
86- runErr = err
168+ if g .config .ContinueOnError {
169+ g .logger .Warnf ("Iteration failed but continuing due to --continue-on-error: %v" , err )
170+ g .executor .RecordError (err )
171+ } else {
172+ runErr = err
173+ }
87174 }
88175 case <- contextToWaitOn .Done ():
89176 }
@@ -125,6 +212,7 @@ func (g *genericRun) Run(ctx context.Context) error {
125212 run := g .info .NewRun (i + 1 )
126213 go func () {
127214 var err error
215+ var shouldRecordCompletion bool
128216 iterStart := time .Now ()
129217
130218 defer func () {
@@ -133,8 +221,11 @@ func (g *genericRun) Run(ctx context.Context) error {
133221 select {
134222 case <- ctx .Done ():
135223 case doneCh <- err :
136- if err == nil && g .config .OnCompletion != nil {
137- g .config .OnCompletion (ctx , run )
224+ if err == nil && shouldRecordCompletion {
225+ g .executor .RecordCompletion ()
226+ if g .config .OnCompletion != nil {
227+ g .config .OnCompletion (ctx , run )
228+ }
138229 }
139230 }
140231 }()
@@ -145,7 +236,22 @@ func (g *genericRun) Run(ctx context.Context) error {
145236 if err != nil && g .config .HandleExecuteError != nil {
146237 err = g .config .HandleExecuteError (ctx , run , err )
147238 }
239+
240+ // Check if workflow was already started
241+ if err != nil {
242+ var alreadyStartedErr * serviceerror.WorkflowExecutionAlreadyStarted
243+ if errors .As (err , & alreadyStartedErr ) {
244+ if g .config .IgnoreAlreadyStarted {
245+ g .logger .Debugf ("Workflow already started, skipping iteration %v" , run .Iteration )
246+ err = nil
247+ shouldRecordCompletion = false
248+ break
249+ }
250+ }
251+ }
252+
148253 if err == nil {
254+ shouldRecordCompletion = true
149255 break
150256 }
151257
0 commit comments