Skip to content

Commit a8a26a4

Browse files
authored
Add --max-iterations-per-second (#132)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed <!-- Describe what has changed in this PR --> Add RPS parameter for executor. ## Why? <!-- Tell your future self why have you made these changes --> To generate continuous but throttled load. The new parameter allows to create a scenario where a large number of iterations are started one-by-one at a constant rate (e.g. 5/s). This is useful for creating a backlog of workflows (by turning off the workers but keep creating new workflows). ## Checklist <!--- add/delete as needed ---> 1. Closes <!-- add issue number here --> 2. How was this tested: <!--- Please describe how you tested your changes/how we can test them --> 3. Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io -->
1 parent 4d802ca commit a8a26a4

File tree

5 files changed

+43
-0
lines changed

5 files changed

+43
-0
lines changed

cmd/run_scenario.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type scenarioRunConfig struct {
5252
iterations int
5353
duration time.Duration
5454
maxConcurrent int
55+
maxIterationsPerSecond float64
5556
scenarioOptions []string
5657
timeout time.Duration
5758
doNotRegisterSearchAttributes bool
@@ -75,6 +76,8 @@ func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) {
7576
fs.IntVar(&r.iterations, "iterations", 0, "Override default iterations for the scenario (cannot be provided with duration)")
7677
fs.DurationVar(&r.duration, "duration", 0, "Override duration for the scenario (cannot be provided with iteration)."+
7778
" This is the amount of time for which we will start new iterations of the scenario.")
79+
fs.Float64Var(&r.maxIterationsPerSecond, "max-iterations-per-second", 0, "Override iterations per second rate limit for the scenario."+
80+
" This is the maximum rate at which we will start new iterations of the scenario.")
7881
fs.DurationVar(&r.timeout, "timeout", 0, "If set, the scenario will stop after this amount of"+
7982
" time has elapsed. Any still-running iterations will be cancelled, and omes will exit nonzero.")
8083
fs.IntVar(&r.maxConcurrent, "max-concurrent", 0, "Override max-concurrent for the scenario")
@@ -136,6 +139,7 @@ func (r *scenarioRunner) run(ctx context.Context) error {
136139
Iterations: r.iterations,
137140
Duration: r.duration,
138141
MaxConcurrent: r.maxConcurrent,
142+
MaxIterationsPerSecond: r.maxIterationsPerSecond,
139143
Timeout: r.timeout,
140144
DoNotRegisterSearchAttributes: r.doNotRegisterSearchAttributes,
141145
},

cmd/run_scenario_with_worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"fmt"
6+
67
"github.com/spf13/cobra"
78
"github.com/spf13/pflag"
89
"github.com/temporalio/omes/cmd/cmdoptions"
@@ -64,6 +65,7 @@ func (r *workerWithScenarioRunner) run(ctx context.Context) error {
6465
iterations: r.iterations,
6566
duration: r.duration,
6667
maxConcurrent: r.maxConcurrent,
68+
maxIterationsPerSecond: r.maxIterationsPerSecond,
6769
scenarioOptions: r.scenarioOptions,
6870
timeout: r.timeout,
6971
doNotRegisterSearchAttributes: r.doNotRegisterSearchAttributes,

loadgen/generic_executor.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ func (g *GenericExecutor) newRun(info ScenarioInfo) (*genericRun, error) {
5757
if run.config.Timeout == 0 {
5858
run.config.Timeout = g.DefaultConfiguration.Timeout
5959
}
60+
if run.config.MaxIterationsPerSecond == 0 {
61+
run.config.MaxIterationsPerSecond = g.DefaultConfiguration.MaxIterationsPerSecond
62+
}
6063
run.config.ApplyDefaults()
6164
if run.config.Iterations > 0 && run.config.Duration > 0 {
6265
return nil, fmt.Errorf("invalid scenario: iterations and duration are mutually exclusive")
@@ -113,9 +116,21 @@ func (g *genericRun) Run(ctx context.Context) error {
113116
g.logger.Debugf("Will start iterations for %v", g.config.Duration)
114117
}
115118

119+
var rateLimiter <-chan time.Time
120+
if g.config.MaxIterationsPerSecond > 0 {
121+
g.logger.Debugf("Will run at rate of %v iteration(s) per second", g.config.MaxIterationsPerSecond)
122+
rateLimiter = time.Tick(time.Duration(float64(time.Second) / g.config.MaxIterationsPerSecond))
123+
}
124+
116125
// Run all until we've gotten an error or reached iteration limit or timeout
117126
for i := 0; runErr == nil && durationCtx.Err() == nil &&
118127
(g.config.Iterations == 0 || i < g.config.Iterations); i++ {
128+
129+
// If there is a rate limit, enforce it
130+
if rateLimiter != nil {
131+
<-rateLimiter
132+
}
133+
119134
// If there are already MaxConcurrent running, wait for one
120135
if currentlyRunning >= g.config.MaxConcurrent {
121136
waitOne(durationCtx)

loadgen/generic_executor_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,22 @@ func TestRunIterationsWithoutTimeout(t *testing.T) {
189189
require.NoError(t, err)
190190
tracker.assertSeen(t, 5)
191191
}
192+
193+
func TestRunIterationsWithRateLimit(t *testing.T) {
194+
startTime := time.Now()
195+
tracker := newIterationTracker()
196+
err := execute(&GenericExecutor{
197+
Execute: func(ctx context.Context, run *Run) error {
198+
tracker.track(run.Iteration)
199+
return nil
200+
},
201+
DefaultConfiguration: RunConfiguration{
202+
Iterations: 4,
203+
MaxConcurrent: 1,
204+
MaxIterationsPerSecond: 4.0,
205+
},
206+
})
207+
require.NoError(t, err)
208+
require.GreaterOrEqual(t, time.Since(startTime), time.Second)
209+
tracker.assertSeen(t, 4)
210+
}

loadgen/scenario.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ type RunConfiguration struct {
128128
// Maximum number of instances of the Execute method to run concurrently.
129129
// Default is DefaultMaxConcurrent.
130130
MaxConcurrent int
131+
// MaxIterationsPerSecond is the maximum number of iterations to run per second.
132+
// Default is zero, meaning unlimited.
133+
MaxIterationsPerSecond float64
131134
// Timeout is the maximum amount of time we'll wait for the scenario to finish running.
132135
// If the timeout is hit any pending executions will be cancelled and the scenario will exit
133136
// with an error. The default is unlimited.

0 commit comments

Comments
 (0)