Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 10 additions & 24 deletions scenarios/ebb_and_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,14 @@ func (e *ebbAndFlowExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo)
backlog = started - completed

target = calculateBacklogTarget(elapsed, config.Period, config.MinBacklog, config.MaxBacklog)
rate = calculateSpawnRate(target, backlog, config.MinBacklog, config.MaxBacklog, config.MaxRate)

if rate > 0 {
activities := target - backlog
activities = max(activities, 0)
activities = min(activities, config.MaxRate)
if activities > 0 {
startWG.Add(1)
go func(iter, rate int64) {
defer startWG.Done()
errCh <- e.spawnWorkflowWithActivities(ctx, iter, rate, config.SleepActivityConfig)
errCh <- e.spawnWorkflowWithActivities(ctx, iter, activities, config.SleepActivityConfig)
}(iter, rate)
iter++
}
Expand Down Expand Up @@ -281,11 +282,11 @@ func (e *ebbAndFlowExecutor) LoadState(loader func(any) error) error {

func (e *ebbAndFlowExecutor) spawnWorkflowWithActivities(
ctx context.Context,
iteration, rate int64,
iteration, activities int64,
template *loadgen.SleepActivityConfig,
) error {
// Override activity count to fixed rate.
fixedDist := loadgen.NewFixedDistribution(rate)
fixedDist := loadgen.NewFixedDistribution(activities)
config := loadgen.SleepActivityConfig{
Count: &fixedDist,
Groups: template.Groups,
Expand All @@ -309,15 +310,15 @@ func (e *ebbAndFlowExecutor) spawnWorkflowWithActivities(
if err != nil {
return fmt.Errorf("failed to start ebbAndFlowTrack workflow for iteration %d: %w", iteration, err)
}
e.scheduledActivities.Add(rate)
e.scheduledActivities.Add(activities)

// Wait for workflow completion
var result ebbandflow.WorkflowOutput
err = wf.Get(ctx, &result)
if err != nil {
e.Logger.Errorf("ebbAndFlowTrack workflow failed for iteration %d: %v", iteration, err)
}
e.completedActivities.Add(rate)
e.completedActivities.Add(activities)
e.incrementTotalCompletedWorkflow()

return nil
Expand All @@ -336,23 +337,8 @@ func calculateBacklogTarget(
minBacklog, maxBacklog int64,
) int64 {
periods := elapsed.Seconds() / period.Seconds()
osc := (math.Sin(2*math.Pi*periods) + 1.0) / 2
osc := (math.Sin(2*math.Pi*(periods-0.25)) + 1.0) / 2
backlogRange := float64(maxBacklog - minBacklog)
baseTarget := float64(minBacklog) + osc*backlogRange
return int64(math.Round(baseTarget))
}

func calculateSpawnRate(
target int64,
backlog int64,
minBacklog, maxBacklog int64,
maxRate int64,
) int64 {
backlogDelta := float64(target - backlog) // how far backlog is from target
scaledBacklogDelta := math.Abs(backlogDelta) / float64(maxBacklog-minBacklog) // normalize to 0.0-1.0 range
gain := 1.0 + 2.0*scaledBacklogDelta // smooth gain scheduling: 1.0 (small errors) to 3.0 (large errors)
rate := int64(backlogDelta * gain) // calculate desired spawn rate (workflows/second)
rate = min(maxRate, rate) // cap at maximum allowed rate
rate = max(0, rate)
return rate
}
Loading