Skip to content

Commit 669a545

Browse files
authored
Fix throughput_stress resume (#195)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed <!-- Describe what has changed in this PR --> Fix resume bug. ## Why? <!-- Tell your future self why have you made these changes --> The `OnComplete` hook wasn't always invoked properly leading to under-counting the completed iterations. ## Checklist <!--- add/delete as needed ---> 1. Closes <!-- add issue number here --> 2. How was this tested: <!--- Please describe how you tested your changes/how we can test them --> 3. Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io -->
1 parent 8f5b185 commit 669a545

File tree

9 files changed

+105
-63
lines changed

9 files changed

+105
-63
lines changed

loadgen/generic_executor.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (g *genericRun) Run(ctx context.Context) error {
7676

7777
startTime := time.Now()
7878
var runErr error
79-
doneCh := make(chan error)
79+
doneCh := make(chan error, g.config.MaxConcurrent)
8080
var currentlyRunning int
8181
waitOne := func(contextToWaitOn context.Context) {
8282
select {
@@ -125,11 +125,26 @@ func (g *genericRun) Run(ctx context.Context) error {
125125
run := g.info.NewRun(i + 1)
126126
go func() {
127127
var err error
128-
startTime := time.Now()
128+
iterStart := time.Now()
129+
130+
defer func() {
131+
g.executeTimer.Record(time.Since(iterStart))
132+
133+
select {
134+
case <-ctx.Done():
135+
case doneCh <- err:
136+
if err == nil && g.config.OnCompletion != nil {
137+
g.config.OnCompletion(ctx, run)
138+
}
139+
}
140+
}()
129141

130142
retryLoop:
131143
for {
132144
err = g.executor.Execute(ctx, run)
145+
if err != nil && g.config.HandleExecuteError != nil {
146+
err = g.config.HandleExecuteError(ctx, run, err)
147+
}
133148
if err == nil {
134149
break
135150
}
@@ -146,18 +161,9 @@ func (g *genericRun) Run(ctx context.Context) error {
146161

147162
select {
148163
case <-time.After(backoff):
164+
// wait for backoff, then try again
149165
case <-ctx.Done():
150-
break retryLoop // just fall through to next select
151-
}
152-
}
153-
154-
select {
155-
case <-ctx.Done():
156-
case doneCh <- err:
157-
g.executeTimer.Record(time.Since(startTime))
158-
159-
if err == nil && g.config.OnCompletion != nil {
160-
g.config.OnCompletion(ctx, run)
166+
break retryLoop
161167
}
162168
}
163169
}()
@@ -166,7 +172,8 @@ func (g *genericRun) Run(ctx context.Context) error {
166172
// Wait for all to be done or an error to occur. We will wait past the overall duration for
167173
// executions to complete. It is expected that whatever is running omes may choose to enforce
168174
// a hard timeout if waiting for started executions to complete exceeds a certain threshold.
169-
g.logger.Info("Run cooldown: stopped starting new iterations; waiting for running ones to complete")
175+
g.logger.Infof("Run cooldown: stopped starting new iterations and waiting for %d iterations to complete",
176+
currentlyRunning)
170177
for runErr == nil && currentlyRunning > 0 {
171178
waitOne(ctx)
172179
if ctx.Err() != nil {

loadgen/helpers.go

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ func InitSearchAttribute(
2020
info ScenarioInfo,
2121
attributeName string,
2222
) error {
23-
info.Logger.Infof("Initialising Search Attribute %q", attributeName)
24-
2523
_, err := info.Client.OperatorService().AddSearchAttributes(ctx,
2624
&operatorservice.AddSearchAttributesRequest{
2725
Namespace: info.Namespace,
@@ -32,9 +30,9 @@ func InitSearchAttribute(
3230
var deniedErr *serviceerror.PermissionDenied
3331
var alreadyErr *serviceerror.AlreadyExists
3432
if errors.As(err, &alreadyErr) {
35-
info.Logger.Infof("Search Attribute %q already exists", attributeName)
33+
info.Logger.Infof("Search Attribute %q not added: already exists", attributeName)
3634
} else if err != nil {
37-
info.Logger.Warnf("Failed to add Search Attribute %q: %v", attributeName, err)
35+
info.Logger.Warnf("Search Attribute %q not added: %v", attributeName, err)
3836
if !errors.As(err, &deniedErr) {
3937
return err
4038
}
@@ -45,8 +43,6 @@ func InitSearchAttribute(
4543
return nil
4644
}
4745

48-
// MinVisibilityCountEventually checks that the given visibility query returns at least the expected
49-
// number of workflows. It repeatedly queries until it either finds the expected count or times out.
5046
func MinVisibilityCountEventually(
5147
ctx context.Context,
5248
info ScenarioInfo,
@@ -64,24 +60,44 @@ func MinVisibilityCountEventually(
6460
defer printTicker.Stop()
6561

6662
var lastVisibilityCount int64
67-
for {
63+
done := false
64+
65+
check := func() error {
66+
visibilityCount, err := info.Client.CountWorkflow(timeoutCtx, request)
67+
if err != nil {
68+
return fmt.Errorf("failed to count workflows in visibility: %w", err)
69+
}
70+
lastVisibilityCount = visibilityCount.Count
71+
if lastVisibilityCount >= int64(minCount) {
72+
done = true
73+
}
74+
return nil
75+
}
76+
77+
// Initial check before entering the loop.
78+
if err := check(); err != nil {
79+
return err
80+
}
81+
82+
// Loop until we reach the desired count or timeout.
83+
for !done {
6884
select {
6985
case <-timeoutCtx.Done():
70-
return fmt.Errorf("expected at least %d workflows in visibility, got %d after waiting %v",
71-
minCount, lastVisibilityCount, waitAtMost)
86+
return fmt.Errorf(
87+
"expected at least %d workflows in visibility, got %d after waiting %v",
88+
minCount, lastVisibilityCount, waitAtMost,
89+
)
7290

7391
case <-printTicker.C:
74-
info.Logger.Infof("current visibility count: %d (expected at least: %d)\n", lastVisibilityCount, minCount)
92+
info.Logger.Infof("current visibility count: %d (expected at least: %d)\n",
93+
lastVisibilityCount, minCount)
7594

7695
case <-countTicker.C:
77-
visibilityCount, err := info.Client.CountWorkflow(ctx, request)
78-
if err != nil {
79-
return fmt.Errorf("failed to count workflows in visibility: %w", err)
80-
}
81-
lastVisibilityCount = visibilityCount.Count
82-
if lastVisibilityCount >= int64(minCount) {
83-
return nil
96+
if err := check(); err != nil {
97+
return err
8498
}
8599
}
86100
}
101+
102+
return nil
87103
}

loadgen/kitchen_sink_executor.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ type KitchenSinkExecutor struct {
1616
// Called for each iteration. TestInput is copied entirely into KitchenSinkWorkflowOptions on
1717
// each iteration.
1818
UpdateWorkflowOptions func(context.Context, *Run, *KitchenSinkWorkflowOptions) error
19-
20-
DefaultConfiguration RunConfiguration
2119
}
2220

2321
func (k KitchenSinkExecutor) Run(ctx context.Context, info ScenarioInfo) error {
@@ -46,7 +44,3 @@ func (k KitchenSinkExecutor) Run(ctx context.Context, info ScenarioInfo) error {
4644
}
4745
return ge.Run(ctx, info)
4846
}
49-
50-
func (k KitchenSinkExecutor) GetDefaultConfiguration() RunConfiguration {
51-
return k.DefaultConfiguration
52-
}

loadgen/kitchen_sink_executor_test.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -784,15 +784,13 @@ func testForSDK(
784784

785785
executor := &KitchenSinkExecutor{
786786
TestInput: tc.testInput,
787-
DefaultConfiguration: RunConfiguration{
788-
Iterations: 1,
789-
},
790787
}
791-
792788
scenarioInfo := ScenarioInfo{
793-
ScenarioName: "kitchenSinkTest",
794-
RunID: fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()),
795-
Configuration: executor.DefaultConfiguration,
789+
ScenarioName: "kitchenSinkTest",
790+
RunID: fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()),
791+
Configuration: RunConfiguration{
792+
Iterations: 1,
793+
},
796794
}
797795

798796
if expectedErr, expectUnsupported := tc.expectedUnsupportedErrs[sdk]; expectUnsupported {

loadgen/scenario.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ type RunConfiguration struct {
203203
DoNotRegisterSearchAttributes bool
204204
// OnCompletion, if set, is invoked after each successful iteration completes.
205205
OnCompletion func(context.Context, *Run)
206+
// HandleExecuteError, if set, is called when Execute returns an error, allowing transformation of errors.
207+
HandleExecuteError func(context.Context, *Run, error) error
206208
}
207209

208210
func (r *RunConfiguration) ApplyDefaults() {

scenarios/fixed_resource_consumption.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package scenarios
22

33
import (
4+
"math"
5+
"math/rand"
6+
"time"
7+
48
"github.com/temporalio/omes/loadgen"
59
"github.com/temporalio/omes/loadgen/kitchensink"
610
"go.temporal.io/api/common/v1"
711
"google.golang.org/protobuf/types/known/durationpb"
8-
"math"
9-
"math/rand"
10-
"time"
1112
)
1213

1314
// This scenario is meant to be adjusted and run manually to evaluate the performance of different
@@ -62,10 +63,6 @@ func init() {
6263
loadgen.MustRegisterScenario(loadgen.Scenario{
6364
Description: "Used for testing slot provider performance. Runs activities that consume certain amounts of resources.",
6465
Executor: loadgen.KitchenSinkExecutor{
65-
DefaultConfiguration: loadgen.RunConfiguration{
66-
Iterations: 1,
67-
MaxConcurrent: 1,
68-
},
6966
TestInput: &kitchensink.TestInput{
7067
WorkflowInput: &kitchensink.WorkflowInput{
7168
InitialActions: []*kitchensink.ActionSet{

scenarios/throughput_stress.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
. "github.com/temporalio/omes/loadgen/kitchensink"
1515
"go.temporal.io/api/common/v1"
1616
"go.temporal.io/api/enums/v1"
17+
"go.temporal.io/api/serviceerror"
1718
"go.temporal.io/api/workflowservice/v1"
1819
"go.temporal.io/sdk/temporal"
1920
"google.golang.org/protobuf/types/known/emptypb"
@@ -191,13 +192,27 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
191192
// Listen to iteration completion events to update the state.
192193
info.Configuration.OnCompletion = func(ctx context.Context, run *loadgen.Run) {
193194
t.updateStateOnIterationCompletion()
195+
info.Logger.Debugf("Completed iteration %d", run.Iteration)
196+
}
197+
198+
// When resuming, it can happen that the workflow for the current iteration already exists since the snapshot
199+
// was not up-to-date. In that case, we just skip this iteration and move on.
200+
info.Configuration.HandleExecuteError = func(ctx context.Context, run *loadgen.Run, err error) error {
201+
if isResuming {
202+
var alreadyStartedErr *serviceerror.WorkflowExecutionAlreadyStarted
203+
if errors.As(err, &alreadyStartedErr) {
204+
info.Logger.Warnf("after resume, workflow for iteration %d already exists", run.Iteration)
205+
return nil
206+
}
207+
}
208+
return err
194209
}
195210

196211
// Start the scenario run.
197212
//
198-
// NOTE: When resuming, it can happen that there is no more time left to run more iterations. In that case,
199-
// we skip the executor run and go straight to the post-scenario verification.
200-
if isResuming && info.Configuration.Duration <= 0 {
213+
// NOTE: When resuming, it can happen that there are no more iterations/time left to run more iterations.
214+
// In that case, we skip the executor run and go straight to the post-scenario verification.
215+
if isResuming && info.Configuration.Duration <= 0 && info.Configuration.Iterations == 0 {
201216
info.Logger.Info("Skipping executor run: out of time")
202217
} else {
203218
ksExec := &loadgen.KitchenSinkExecutor{
@@ -316,6 +331,7 @@ func (t *tpsExecutor) updateStateOnIterationCompletion() {
316331
defer t.lock.Unlock()
317332
t.state.CompletedIterations += 1
318333
t.state.LastCompletedIterationAt = time.Now()
334+
fmt.Println("Updating state on iteration completion", t.state.CompletedIterations)
319335
}
320336

321337
func (t *tpsExecutor) createActions(iteration int) []*ActionSet {

scenarios/throughput_stress_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ func TestThroughputStress(t *testing.T) {
1919
taskQueueName := loadgen.TaskQueueForRun(scenarioName, runID)
2020

2121
env := workers.SetupTestEnvironment(t,
22-
workers.WithExecutorTimeout(2*time.Minute),
22+
workers.WithExecutorTimeout(1*time.Minute),
2323
workers.WithNexusEndpoint(taskQueueName))
2424

2525
scenarioInfo := loadgen.ScenarioInfo{
2626
ScenarioName: scenarioName,
2727
RunID: runID,
2828
Configuration: loadgen.RunConfiguration{
29-
Iterations: 1,
29+
Iterations: 2,
3030
},
3131
ScenarioOptions: map[string]string{
3232
IterFlag: "2",
@@ -44,17 +44,29 @@ func TestThroughputStress(t *testing.T) {
4444
require.NoError(t, err, "Executor should complete successfully")
4545

4646
state := executor.Snapshot().(tpsState)
47-
require.Equal(t, state.CompletedIterations, 1)
47+
require.Equal(t, state.CompletedIterations, 2)
4848

49-
t.Log("Start the executor again, pretending to resume")
49+
t.Log("Start the executor again, resuming from middle")
5050

5151
err = executor.LoadState(func(v any) error {
5252
s := v.(*tpsState)
53-
s.CompletedIterations = state.CompletedIterations
53+
s.CompletedIterations = 0 // execution will start from iteration 1
5454
return nil
5555
})
5656
require.NoError(t, err)
5757

5858
err = env.RunExecutorTest(t, executor, scenarioInfo, cmdoptions.LangGo)
59-
require.NoError(t, err, "Executor should complete successfully again")
59+
require.NoError(t, err, "Executor should complete successfully when resuming from middle")
60+
61+
t.Log("Start the executor again, resuming from end")
62+
63+
err = executor.LoadState(func(v any) error {
64+
s := v.(*tpsState)
65+
s.CompletedIterations = s.CompletedIterations
66+
return nil
67+
})
68+
require.NoError(t, err)
69+
70+
err = env.RunExecutorTest(t, executor, scenarioInfo, cmdoptions.LangGo)
71+
require.NoError(t, err, "Executor should complete successfully when resuming from end")
6072
}

workers/test_env.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func (env *TestEnvironment) RunExecutorTest(
187187
testCtx, cancelTestCtx := context.WithTimeout(t.Context(), env.executorTimeout)
188188
defer cancelTestCtx()
189189

190-
workerDone := env.startWorker(t, sdk, taskQueueName, scenarioID)
190+
workerDone := env.startWorker(testCtx, sdk, taskQueueName, scenarioID)
191191

192192
// Update scenario info with test environment details
193193
scenarioInfo.Logger = env.logger.Named("executor")
@@ -259,7 +259,7 @@ func (env *TestEnvironment) ensureWorkerBuilt(t *testing.T, sdk cmdoptions.Langu
259259
}
260260

261261
func (env *TestEnvironment) startWorker(
262-
t *testing.T,
262+
ctx context.Context,
263263
sdk cmdoptions.Language,
264264
taskQueueName string,
265265
scenarioID cmdoptions.ScenarioID,
@@ -283,7 +283,7 @@ func (env *TestEnvironment) startWorker(
283283
Namespace: testNamespace,
284284
},
285285
}
286-
workerDone <- runner.Run(t.Context(), baseDir)
286+
workerDone <- runner.Run(ctx, baseDir)
287287
}()
288288

289289
return workerDone

0 commit comments

Comments
 (0)