diff --git a/cmd/cli/run_scenario.go b/cmd/cli/run_scenario.go index eee8d1b0..a5bde8b6 100644 --- a/cmd/cli/run_scenario.go +++ b/cmd/cli/run_scenario.go @@ -78,7 +78,7 @@ func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) { fs.DurationVar(&r.timeout, "timeout", 0, "If set, the scenario will stop after this amount of"+ " time has elapsed. Any still-running iterations will be cancelled, and omes will exit nonzero.") fs.IntVar(&r.maxConcurrent, "max-concurrent", 0, "Override max-concurrent for the scenario") - fs.StringSliceVar(&r.scenarioOptions, "option", nil, "Additional options for the scenario, in key=value format") + fs.StringArrayVar(&r.scenarioOptions, "option", nil, "Additional options for the scenario, in key=value format") fs.BoolVar(&r.doNotRegisterSearchAttributes, "do-not-register-search-attributes", false, "Do not register the default search attributes used by scenarios. "+ "If the search attributes are not registed by the scenario they must be registered through some other method") diff --git a/loadgen/helper_sleepactivity.go b/loadgen/helper_sleepactivity.go index b2efed78..cbfb42c0 100644 --- a/loadgen/helper_sleepactivity.go +++ b/loadgen/helper_sleepactivity.go @@ -39,7 +39,7 @@ type SleepActivityGroupConfig struct { FairnessWeight *DistributionField[float32] `json:"fairnessWeight"` } -func ParseAndValidateSleepActivityConfig(jsonStr string, requireCount bool) (*SleepActivityConfig, error) { +func ParseAndValidateSleepActivityConfig(jsonStr string, requireCount, requireSleepDuration bool) (*SleepActivityConfig, error) { if jsonStr == "" { return nil, nil } @@ -57,7 +57,7 @@ func ParseAndValidateSleepActivityConfig(jsonStr string, requireCount bool) (*Sl if groupConfig.Weight < 0 { return nil, fmt.Errorf("SleepActivityGroupConfig: Group '%s' Weight must be non-negative", groupID) } - if groupConfig.SleepDuration == nil { + if requireSleepDuration && groupConfig.SleepDuration == nil { return nil, fmt.Errorf("SleepActivityGroupConfig: Group '%s' SleepDuration field is required", groupID) } } diff --git a/scenarios/ebb_and_flow.go b/scenarios/ebb_and_flow.go index 35067911..aa6beafb 100644 --- a/scenarios/ebb_and_flow.go +++ b/scenarios/ebb_and_flow.go @@ -121,8 +121,8 @@ func (e *ebbAndFlowExecutor) Configure(info loadgen.ScenarioInfo) error { if sleepActivitiesStr, ok := info.ScenarioOptions[SleepActivityJsonFlag]; ok { var err error - // This scenario overrides "count" so do not require it. - config.SleepActivityConfig, err = loadgen.ParseAndValidateSleepActivityConfig(sleepActivitiesStr, false) + // This scenario overrides "count" and "sleepDuration" so do not require them. + config.SleepActivityConfig, err = loadgen.ParseAndValidateSleepActivityConfig(sleepActivitiesStr, false, false) if err != nil { return fmt.Errorf("invalid %s: %w", SleepActivityJsonFlag, err) } @@ -185,7 +185,7 @@ func (e *ebbAndFlowExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) e.Logger.Infof("Starting ebb and flow scenario: min_backlog=%d, max_backlog=%d, period=%v, duration=%v", config.MinBacklog, config.MaxBacklog, config.Period, e.Configuration.Duration) - var started, completed, backlog, target, rate int64 + var started, completed, backlog, target, activities int64 for elapsed := time.Duration(0); elapsed < e.Configuration.Duration; elapsed = time.Since(e.startTime) { select { @@ -207,20 +207,20 @@ func (e *ebbAndFlowExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) backlog = started - completed target = calculateBacklogTarget(elapsed, config.Period, config.MinBacklog, config.MaxBacklog) - activities := target - backlog + activities = target - backlog activities = max(activities, 0) activities = min(activities, config.MaxRate) if activities > 0 { startWG.Add(1) - go func(iter, rate int64) { + go func(iter, activities int64) { defer startWG.Done() errCh <- e.spawnWorkflowWithActivities(ctx, iter, activities, config.SleepActivityConfig) - }(iter, rate) + }(iter, activities) iter++ } case <-backlogTicker.C: - e.Logger.Debugf("Backlog: %d, target: %d, rate: %d/s, started: %d, completed: %d", - backlog, target, rate, started, completed) + e.Logger.Debugf("Backlog: %d, target: %d, last iter: %d, started: %d, completed: %d", + backlog, target, activities, started, completed) } } diff --git a/scenarios/throughput_stress.go b/scenarios/throughput_stress.go index f93829f1..b73ca13d 100644 --- a/scenarios/throughput_stress.go +++ b/scenarios/throughput_stress.go @@ -150,7 +150,7 @@ func (t *tpsExecutor) Configure(info loadgen.ScenarioInfo) error { if sleepActivitiesStr, ok := info.ScenarioOptions[SleepActivityJsonFlag]; ok { var err error - config.SleepActivities, err = loadgen.ParseAndValidateSleepActivityConfig(sleepActivitiesStr, true) + config.SleepActivities, err = loadgen.ParseAndValidateSleepActivityConfig(sleepActivitiesStr, true, true) if err != nil { return fmt.Errorf("invalid %s: %w", SleepActivityJsonFlag, err) }