diff --git a/scenarios/ebb_and_flow.go b/scenarios/ebb_and_flow.go index 063f6523..35067911 100644 --- a/scenarios/ebb_and_flow.go +++ b/scenarios/ebb_and_flow.go @@ -207,13 +207,14 @@ func (e *ebbAndFlowExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) backlog = started - completed target = calculateBacklogTarget(elapsed, config.Period, config.MinBacklog, config.MaxBacklog) - rate = calculateSpawnRate(target, backlog, config.MinBacklog, config.MaxBacklog, config.MaxRate) - - if rate > 0 { + activities := target - backlog + activities = max(activities, 0) + activities = min(activities, config.MaxRate) + if activities > 0 { startWG.Add(1) go func(iter, rate int64) { defer startWG.Done() - errCh <- e.spawnWorkflowWithActivities(ctx, iter, rate, config.SleepActivityConfig) + errCh <- e.spawnWorkflowWithActivities(ctx, iter, activities, config.SleepActivityConfig) }(iter, rate) iter++ } @@ -281,11 +282,11 @@ func (e *ebbAndFlowExecutor) LoadState(loader func(any) error) error { func (e *ebbAndFlowExecutor) spawnWorkflowWithActivities( ctx context.Context, - iteration, rate int64, + iteration, activities int64, template *loadgen.SleepActivityConfig, ) error { // Override activity count to fixed rate. - fixedDist := loadgen.NewFixedDistribution(rate) + fixedDist := loadgen.NewFixedDistribution(activities) config := loadgen.SleepActivityConfig{ Count: &fixedDist, Groups: template.Groups, @@ -309,7 +310,7 @@ func (e *ebbAndFlowExecutor) spawnWorkflowWithActivities( if err != nil { return fmt.Errorf("failed to start ebbAndFlowTrack workflow for iteration %d: %w", iteration, err) } - e.scheduledActivities.Add(rate) + e.scheduledActivities.Add(activities) // Wait for workflow completion var result ebbandflow.WorkflowOutput @@ -317,7 +318,7 @@ func (e *ebbAndFlowExecutor) spawnWorkflowWithActivities( if err != nil { e.Logger.Errorf("ebbAndFlowTrack workflow failed for iteration %d: %v", iteration, err) } - e.completedActivities.Add(rate) + e.completedActivities.Add(activities) e.incrementTotalCompletedWorkflow() return nil @@ -336,23 +337,8 @@ func calculateBacklogTarget( minBacklog, maxBacklog int64, ) int64 { periods := elapsed.Seconds() / period.Seconds() - osc := (math.Sin(2*math.Pi*periods) + 1.0) / 2 + osc := (math.Sin(2*math.Pi*(periods-0.25)) + 1.0) / 2 backlogRange := float64(maxBacklog - minBacklog) baseTarget := float64(minBacklog) + osc*backlogRange return int64(math.Round(baseTarget)) } - -func calculateSpawnRate( - target int64, - backlog int64, - minBacklog, maxBacklog int64, - maxRate int64, -) int64 { - backlogDelta := float64(target - backlog) // how far backlog is from target - scaledBacklogDelta := math.Abs(backlogDelta) / float64(maxBacklog-minBacklog) // normalize to 0.0-1.0 range - gain := 1.0 + 2.0*scaledBacklogDelta // smooth gain scheduling: 1.0 (small errors) to 3.0 (large errors) - rate := int64(backlogDelta * gain) // calculate desired spawn rate (workflows/second) - rate = min(maxRate, rate) // cap at maximum allowed rate - rate = max(0, rate) - return rate -}