Skip to content

Commit c3ace37

Browse files
committed
Overhaul workflow verification
1 parent 320106c commit c3ace37

17 files changed

+610
-226
lines changed

.github/workflows/docker-images.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,19 @@ jobs:
7878
username: ${{ secrets.DOCKER_USERNAME }}
7979
password: ${{ secrets.DOCKER_PAT }}
8080

81+
- name: Extract branch name
82+
id: extract_branch
83+
run: |
84+
BRANCH_NAME="${GITHUB_HEAD_REF:-${GITHUB_REF#refs/heads/}}"
85+
SANITIZED_BRANCH=$(echo "$BRANCH_NAME" | sed 's/\//-/g' | sed 's/[^a-zA-Z0-9._-]/-/g')
86+
echo "branch_name=$SANITIZED_BRANCH" >> $GITHUB_OUTPUT
87+
8188
- name: Build and push to Docker Hub
8289
env:
8390
LANG: ${{ inputs.lang }}
8491
SDK_VERSION: ${{ inputs.sdk-version || 'checked-out-sdk/' }}
85-
IMAGE_TAG_ARGS: ${{ inputs.sdk-repo-ref && format('--image-tag {0}-{1}', inputs.lang, inputs.docker-tag-ext) || ''}}
92+
BRANCH_TAG_COMPONENT: ${{ inputs.lang && format('{0}-{1}', inputs.lang, steps.extract_branch.outputs.branch_name) || format('cli-{0}', steps.extract_branch.outputs.branch_name) }}
93+
IMAGE_TAG_ARGS: ${{ inputs.sdk-repo-ref && format('--image-tag {0}-{1} --image-tag {2}', inputs.lang, inputs.docker-tag-ext, inputs.lang && format('{0}-{1}', inputs.lang, steps.extract_branch.outputs.branch_name) || format('cli-{0}', steps.extract_branch.outputs.branch_name)) || format('--image-tag {0}', inputs.lang && format('{0}-{1}', inputs.lang, steps.extract_branch.outputs.branch_name) || format('cli-{0}', steps.extract_branch.outputs.branch_name)) }}
8694
TAG_LATEST_ARGS: ${{ inputs.as-latest && '--tag-as-latest' || ''}}
8795
LANG_ARGS: ${{ inputs.lang && format('--language {0}', inputs.lang) || '' }}
8896
VERSION_ARGS: ${{ inputs.sdk-version && format('--version {0}', inputs.sdk-version) || '' }}

cmd/cli/run_scenario.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package cli
22

33
import (
44
"context"
5+
"crypto/rand"
6+
"encoding/hex"
7+
"errors"
58
"fmt"
69
"os"
710
"strings"
@@ -56,7 +59,7 @@ type scenarioRunConfig struct {
5659
scenarioOptions []string
5760
timeout time.Duration
5861
doNotRegisterSearchAttributes bool
59-
ignoreAlreadyStarted bool
62+
continueOnError bool
6063
}
6164

6265
func (r *scenarioRunner) addCLIFlags(fs *pflag.FlagSet) {
@@ -82,8 +85,9 @@ func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) {
8285
fs.BoolVar(&r.doNotRegisterSearchAttributes, "do-not-register-search-attributes", false,
8386
"Do not register the default search attributes used by scenarios. "+
8487
"If the search attributes are not registed by the scenario they must be registered through some other method")
85-
fs.BoolVar(&r.ignoreAlreadyStarted, "ignore-already-started", false,
86-
"Ignore if a workflow with the same ID already exists. A Scenario may choose to override this behavior.")
88+
fs.BoolVar(&r.continueOnError, "continue-on-error", false,
89+
"Continue running even when any iterations fail after all retries are exhausted. "+
90+
"In case of any errors, Omes will exit nonzero and log the errors.")
8791
}
8892

8993
func (r *scenarioRunner) preRun() {
@@ -145,9 +149,16 @@ func (r *scenarioRunner) run(ctx context.Context) error {
145149
return fmt.Errorf("failed to get root directory: %w", err)
146150
}
147151

152+
// Generate a random execution ID to ensure no two executions with the same RunID collide
153+
executionID, err := generateExecutionID()
154+
if err != nil {
155+
return fmt.Errorf("failed to generate execution ID: %w", err)
156+
}
157+
148158
scenarioInfo := loadgen.ScenarioInfo{
149159
ScenarioName: r.scenario.Scenario,
150160
RunID: r.scenario.RunID,
161+
ExecutionID: executionID,
151162
Logger: r.logger,
152163
MetricsHandler: metrics.NewHandler(),
153164
Client: client,
@@ -159,16 +170,41 @@ func (r *scenarioRunner) run(ctx context.Context) error {
159170
MaxIterationAttempts: r.maxIterationAttempts,
160171
Timeout: r.timeout,
161172
DoNotRegisterSearchAttributes: r.doNotRegisterSearchAttributes,
162-
IgnoreAlreadyStarted: r.ignoreAlreadyStarted,
173+
ContinueOnError: r.continueOnError,
163174
},
164175
ScenarioOptions: scenarioOptions,
165176
Namespace: r.clientOptions.Namespace,
166177
RootPath: repoDir,
167178
}
168179
executor := scenario.ExecutorFn()
169-
err = executor.Run(ctx, scenarioInfo)
170-
if err != nil {
171-
return fmt.Errorf("failed scenario: %w", err)
180+
181+
// 1. Run the scenario
182+
scenarioErr := executor.Run(ctx, scenarioInfo)
183+
184+
// Collect all errors
185+
var allErrors []error
186+
if scenarioErr != nil {
187+
allErrors = append(allErrors, fmt.Errorf("scenario execution: %w", scenarioErr))
188+
}
189+
190+
// 2. Run verifications
191+
if verifiable, ok := executor.(loadgen.Verifyable); ok {
192+
verifyErrs := verifiable.VerifyRun(ctx, scenarioInfo)
193+
for _, err := range verifyErrs {
194+
allErrors = append(allErrors, fmt.Errorf("post-scenario verification: %w", err))
195+
}
196+
}
197+
198+
// Aggregate all errors
199+
return errors.Join(allErrors...)
200+
}
201+
202+
// generateExecutionID generates a random execution ID to uniquely identify this particular
203+
// execution of a scenario. This ensures no two executions with the same RunID collide.
204+
func generateExecutionID() (string, error) {
205+
bytes := make([]byte, 8) // 8 bytes = 16 hex characters
206+
if _, err := rand.Read(bytes); err != nil {
207+
return "", err
172208
}
173-
return nil
209+
return hex.EncodeToString(bytes), nil
174210
}

loadgen/generic_executor.go

Lines changed: 139 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,28 @@ package loadgen
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"sync"
68
"time"
79

10+
"go.temporal.io/api/serviceerror"
811
"go.temporal.io/sdk/client"
912
"go.uber.org/zap"
1013
)
1114

15+
// skipIterationErr is a sentinel error indicating that the iteration
16+
// should be skipped and not recorded as a completion or failure.
17+
var skipIterationErr = errors.New("skip iteration")
18+
1219
type GenericExecutor struct {
1320
// Function to execute a single iteration of this scenario
1421
Execute func(context.Context, *Run) error
22+
23+
// State management
24+
mu sync.Mutex
25+
state *ExecutorState
26+
workflowCompletionChecker *WorkflowCompletionChecker
1527
}
1628

1729
type genericRun struct {
@@ -24,13 +36,109 @@ type genericRun struct {
2436
}
2537

2638
func (g *GenericExecutor) Run(ctx context.Context, info ScenarioInfo) error {
39+
g.mu.Lock()
40+
if g.state == nil {
41+
g.state = &ExecutorState{
42+
ExecutionID: info.ExecutionID,
43+
}
44+
}
45+
if g.state.StartedAt.IsZero() {
46+
g.state.StartedAt = time.Now()
47+
}
48+
g.mu.Unlock()
49+
2750
r, err := g.newRun(info)
2851
if err != nil {
2952
return err
3053
}
3154
return r.Run(ctx)
3255
}
3356

57+
func (g *GenericExecutor) RecordCompletion() {
58+
g.mu.Lock()
59+
defer g.mu.Unlock()
60+
61+
if g.state != nil {
62+
g.state.CompletedIterations += 1
63+
g.state.LastCompletedAt = time.Now()
64+
}
65+
}
66+
67+
func (g *GenericExecutor) RecordError(err error) {
68+
g.mu.Lock()
69+
defer g.mu.Unlock()
70+
71+
if g.state != nil && err != nil {
72+
g.state.IterationErrors = append(g.state.IterationErrors, err.Error())
73+
}
74+
}
75+
76+
func (g *GenericExecutor) VerifyRun(ctx context.Context, info ScenarioInfo) []error {
77+
g.mu.Lock()
78+
state := *g.state
79+
checker := g.workflowCompletionChecker
80+
g.mu.Unlock()
81+
82+
if checker == nil {
83+
return nil
84+
}
85+
if err := checker.Verify(ctx, state); err != nil {
86+
return []error{err}
87+
}
88+
return nil
89+
}
90+
91+
// EnableWorkflowCompletionCheck enables workflow completion verification for this executor.
92+
// It initializes a checker with the given timeout and registers the required search attributes.
93+
// The timeout specifies how long to wait for workflow completion verification (defaults to 30 seconds if zero).
94+
// The expectedWorkflowCount function, if provided, calculates the expected number of workflows from the ExecutorState.
95+
// If nil, defaults to using state.CompletedIterations.
96+
// Returns an error if search attribute registration fails.
97+
func (g *GenericExecutor) EnableWorkflowCompletionCheck(ctx context.Context, info ScenarioInfo, timeout time.Duration, expectedWorkflowCount func(ExecutorState) int) error {
98+
checker, err := NewWorkflowCompletionChecker(ctx, info, timeout)
99+
if err != nil {
100+
return err
101+
}
102+
103+
if expectedWorkflowCount != nil {
104+
checker.SetExpectedWorkflowCount(expectedWorkflowCount)
105+
}
106+
107+
g.mu.Lock()
108+
g.workflowCompletionChecker = checker
109+
g.mu.Unlock()
110+
111+
return nil
112+
}
113+
114+
// GetState returns a copy of the current state
115+
func (g *GenericExecutor) GetState() ExecutorState {
116+
g.mu.Lock()
117+
defer g.mu.Unlock()
118+
119+
if g.state == nil {
120+
return ExecutorState{}
121+
}
122+
return *g.state
123+
}
124+
125+
func (g *GenericExecutor) Snapshot() any {
126+
return g.GetState()
127+
}
128+
129+
func (g *GenericExecutor) LoadState(loader func(any) error) error {
130+
var state ExecutorState
131+
if err := loader(&state); err != nil {
132+
return err
133+
}
134+
135+
g.mu.Lock()
136+
g.state = &state
137+
g.mu.Unlock()
138+
139+
return nil
140+
}
141+
34142
func (g *GenericExecutor) newRun(info ScenarioInfo) (*genericRun, error) {
35143
info.Configuration.ApplyDefaults()
36144
if err := info.Configuration.Validate(); err != nil {
@@ -83,7 +191,12 @@ func (g *genericRun) Run(ctx context.Context) error {
83191
case err := <-doneCh:
84192
currentlyRunning--
85193
if err != nil {
86-
runErr = err
194+
if g.config.ContinueOnError {
195+
g.logger.Warnf("Iteration failed but continuing due to --continue-on-error: %v", err)
196+
g.executor.RecordError(err)
197+
} else {
198+
runErr = err
199+
}
87200
}
88201
case <-contextToWaitOn.Done():
89202
}
@@ -130,25 +243,48 @@ func (g *genericRun) Run(ctx context.Context) error {
130243
defer func() {
131244
g.executeTimer.Record(time.Since(iterStart))
132245

246+
// Check if this is the special "skip iteration" error
247+
isSkipIteration := errors.Is(err, skipIterationErr)
248+
if isSkipIteration {
249+
err = nil // Don't propagate this as an actual error
250+
}
251+
133252
select {
134253
case <-ctx.Done():
135254
case doneCh <- err:
136-
if err == nil && g.config.OnCompletion != nil {
137-
g.config.OnCompletion(ctx, run)
255+
if err == nil && !isSkipIteration {
256+
g.executor.RecordCompletion()
257+
if g.config.OnCompletion != nil {
258+
g.config.OnCompletion(ctx, run)
259+
}
138260
}
139261
}
140262
}()
141263

142264
retryLoop:
143265
for {
144266
err = g.executor.Execute(ctx, run)
267+
268+
// Skip if workflow was already started.
269+
if err != nil {
270+
var alreadyStartedErr *serviceerror.WorkflowExecutionAlreadyStarted
271+
if errors.As(err, &alreadyStartedErr) {
272+
g.logger.Debugf("Workflow already started, skipping iteration %v", run.Iteration)
273+
err = skipIterationErr
274+
break
275+
}
276+
}
277+
278+
// If defined, invoke user-defined error handler.
145279
if err != nil && g.config.HandleExecuteError != nil {
146280
err = g.config.HandleExecuteError(ctx, run, err)
147281
}
282+
148283
if err == nil {
149284
break
150285
}
151286

287+
// Attempt to retry.
152288
backoff, retry := run.ShouldRetry(err)
153289
if retry {
154290
err = fmt.Errorf("iteration %v encountered error: %w", run.Iteration, err)

loadgen/generic_executor_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func execute(executor *GenericExecutor, runConfig RunConfiguration) error {
4242
info := ScenarioInfo{
4343
MetricsHandler: client.MetricsNopHandler,
4444
Logger: logger.Sugar(),
45+
ExecutionID: "test-exec-id",
4546
Configuration: runConfig,
4647
}
4748
return executor.Run(context.Background(), info)
@@ -258,3 +259,34 @@ func TestExecutorRetriesLimit(t *testing.T) {
258259
require.Equal(t, []int{1, 1, 1, 1, 1}, totalTracker.seen, "expected 5 attempts")
259260
})
260261
}
262+
263+
func TestExecutorContinuesOnError(t *testing.T) {
264+
synctest.Test(t, func(t *testing.T) {
265+
tracker := newIterationTracker()
266+
executor := &GenericExecutor{
267+
Execute: func(ctx context.Context, run *Run) error {
268+
tracker.track(run.Iteration)
269+
if run.Iteration == 2 || run.Iteration == 4 {
270+
return errors.New("deliberate failure")
271+
}
272+
return nil
273+
},
274+
}
275+
276+
err := execute(executor,
277+
RunConfiguration{
278+
Iterations: 5,
279+
ContinueOnError: true,
280+
},
281+
)
282+
283+
require.NoError(t, err, "executor should complete when ContinueOnError is true")
284+
tracker.assertSeen(t, 5)
285+
286+
state := executor.GetState()
287+
require.Equal(t, 3, state.CompletedIterations)
288+
require.Len(t, state.IterationErrors, 2)
289+
require.Contains(t, state.IterationErrors[0], "deliberate failure")
290+
require.Contains(t, state.IterationErrors[1], "deliberate failure")
291+
})
292+
}

0 commit comments

Comments
 (0)