@@ -39,6 +39,88 @@ const (
3939 BacklogLogIntervalFlag = "backlog-log-interval"
4040)
4141
42+ type ebbAndFlowConfig struct {
43+ MinBacklog int64
44+ MaxBacklog int64
45+ PhaseTime time.Duration
46+ SleepDuration time.Duration
47+ MaxRate int
48+ ControlInterval time.Duration
49+ MaxConsecutiveErrors int
50+ FairnessReportInterval time.Duration
51+ BacklogLogInterval time.Duration
52+ VisibilityVerificationTimeout time.Duration
53+ SleepActivityConfig * loadgen.SleepActivityConfig
54+ }
55+
56+ type ebbAndFlowExecutor struct {
57+ config * ebbAndFlowConfig
58+ }
59+
60+ var _ loadgen.Configurable = (* ebbAndFlowExecutor )(nil )
61+
62+ func (e * ebbAndFlowExecutor ) Configure (info loadgen.ScenarioInfo ) error {
63+ config := & ebbAndFlowConfig {
64+ SleepDuration : info .ScenarioOptionDuration (SleepDurationFlag , 1 * time .Millisecond ),
65+ MaxRate : info .ScenarioOptionInt (MaxRateFlag , 1000 ),
66+ ControlInterval : info .ScenarioOptionDuration (ControlIntervalFlag , 100 * time .Millisecond ),
67+ MaxConsecutiveErrors : info .ScenarioOptionInt (MaxConsecutiveErrorsFlag , 10 ),
68+ BacklogLogInterval : info .ScenarioOptionDuration (BacklogLogIntervalFlag , 30 * time .Second ),
69+ VisibilityVerificationTimeout : info .ScenarioOptionDuration (VisibilityVerificationTimeoutFlag , 30 * time .Second ),
70+ }
71+
72+ config .MinBacklog = int64 (info .ScenarioOptionInt (MinBacklogFlag , 0 ))
73+ if config .MinBacklog < 0 {
74+ return fmt .Errorf ("min-backlog must be non-negative, got %d" , config .MinBacklog )
75+ }
76+
77+ config .MaxBacklog = int64 (info .ScenarioOptionInt (MaxBacklogFlag , 30 ))
78+ if config .MaxBacklog <= config .MinBacklog {
79+ return fmt .Errorf ("max-backlog must be greater than min-backlog, got max=%d min=%d" , config .MaxBacklog , config .MinBacklog )
80+ }
81+
82+ config .PhaseTime = info .ScenarioOptionDuration (PhaseTimeFlag , 60 * time .Second )
83+ if config .PhaseTime <= 0 {
84+ return fmt .Errorf ("phase-time must be greater than 0, got %v" , config .PhaseTime )
85+ }
86+ config .FairnessReportInterval = info .ScenarioOptionDuration (FairnessReportIntervalFlag , config .PhaseTime ) // default to phase time
87+
88+ if sleepActivitiesStr , ok := info .ScenarioOptions [SleepActivityJsonFlag ]; ok {
89+ var err error
90+ config .SleepActivityConfig , err = loadgen .ParseAndValidateSleepActivityConfig (sleepActivitiesStr )
91+ if err != nil {
92+ return fmt .Errorf ("invalid %s: %w" , SleepActivityJsonFlag , err )
93+ }
94+ }
95+ if config .SleepActivityConfig == nil {
96+ config .SleepActivityConfig = & loadgen.SleepActivityConfig {}
97+ }
98+ if len (config .SleepActivityConfig .Groups ) == 0 {
99+ config .SleepActivityConfig .Groups = map [string ]loadgen.SleepActivityGroupConfig {"default" : {}}
100+ }
101+ for name , group := range config .SleepActivityConfig .Groups {
102+ fixedDist := loadgen .NewFixedDistribution (config .SleepDuration )
103+ group .SleepDuration = & fixedDist
104+ config .SleepActivityConfig .Groups [name ] = group
105+ }
106+
107+ e .config = config
108+ return nil
109+ }
110+
111+ // Run executes the ebb and flow scenario.
112+ func (e * ebbAndFlowExecutor ) Run (ctx context.Context , info loadgen.ScenarioInfo ) error {
113+ if err := e .Configure (info ); err != nil {
114+ return fmt .Errorf ("failed to parse scenario configuration: %w" , err )
115+ }
116+
117+ return (& ebbAndFlow {
118+ ScenarioInfo : info ,
119+ rng : rand .New (rand .NewSource (time .Now ().UnixNano ())),
120+ config : e .config ,
121+ }).run (ctx )
122+ }
123+
42124func init () {
43125 loadgen .MustRegisterScenario (loadgen.Scenario {
44126 Description : "Oscillates backlog between min and max.\n " +
@@ -47,18 +129,14 @@ func init() {
47129 " control-interval, max-consecutive-errors, fairness-report-interval,\n " +
48130 " fairness-threshold, backlog-log-interval.\n " +
49131 "Duration must be set." ,
50- Executor : loadgen .ExecutorFunc (func (ctx context.Context , runOptions loadgen.ScenarioInfo ) error {
51- return (& ebbAndFlow {
52- ScenarioInfo : runOptions ,
53- rng : rand .New (rand .NewSource (time .Now ().UnixNano ())),
54- }).run (ctx )
55- }),
132+ Executor : & ebbAndFlowExecutor {},
56133 })
57134}
58135
59136type ebbAndFlow struct {
60137 loadgen.ScenarioInfo
61- rng * rand.Rand
138+ rng * rand.Rand
139+ config * ebbAndFlowConfig
62140
63141 id string
64142 startTime time.Time
@@ -72,26 +150,10 @@ func (e *ebbAndFlow) run(ctx context.Context) error {
72150 e .id = fmt .Sprintf ("ebb_and_flow_%s" , e .RunID )
73151 e .fairnessTracker = ebbandflow .NewFairnessTracker ()
74152
75- // Parse and validate scenario options.
76- minBacklog := int64 (e .ScenarioOptionInt (MinBacklogFlag , 0 ))
77- maxBacklog := int64 (e .ScenarioOptionInt (MaxBacklogFlag , 30 ))
78- phaseTime := e .ScenarioOptionDuration (PhaseTimeFlag , 60 * time .Second )
79- sleepDuration := e .ScenarioOptionDuration (SleepDurationFlag , 1 * time .Millisecond )
80- maxRate := e .ScenarioOptionInt (MaxRateFlag , 1000 )
81- controlInterval := e .ScenarioOptionDuration (ControlIntervalFlag , 100 * time .Millisecond )
82- maxConsecutiveErrors := e .ScenarioOptionInt (MaxConsecutiveErrorsFlag , 10 )
83- fairnessReportInterval := e .ScenarioOptionDuration (FairnessReportIntervalFlag , phaseTime ) // default to phase time
84- backlogLogInterval := e .ScenarioOptionDuration (BacklogLogIntervalFlag , 30 * time .Second )
85- visibilityVerificationTimeout := e .ScenarioOptionDuration (VisibilityVerificationTimeoutFlag , 30 * time .Second )
86-
87- if minBacklog < 0 {
88- return fmt .Errorf ("min-backlog must be non-negative" )
89- }
90- if maxBacklog <= minBacklog {
91- return fmt .Errorf ("max-backlog must be greater than min-backlog" )
92- }
93- if phaseTime <= 0 {
94- return fmt .Errorf ("phase-time must be greater than 0" )
153+ // Get parsed configuration
154+ config := e .config
155+ if config == nil {
156+ return fmt .Errorf ("configuration not parsed - Parse must be called before run" )
95157 }
96158
97159 // Initialize search attribute for visibility tracking
@@ -104,46 +166,25 @@ func (e *ebbAndFlow) run(ctx context.Context) error {
104166 return fmt .Errorf ("failed to initialize search attribute %s: %w" , EbbAndFlowScenarioIdSearchAttribute , err )
105167 }
106168
107- // Activity config
108- var sleepActivityConfig * loadgen.SleepActivityConfig
109- if sleepActivitiesStr , ok := e .ScenarioOptions [SleepActivityJsonFlag ]; ok {
110- var err error
111- sleepActivityConfig , err = loadgen .ParseAndValidateSleepActivityConfig (sleepActivitiesStr )
112- if err != nil {
113- return fmt .Errorf ("failed to parse %s: %w" , SleepActivityJsonFlag , err )
114- }
115- }
116- if sleepActivityConfig == nil {
117- sleepActivityConfig = & loadgen.SleepActivityConfig {}
118- }
119- if len (sleepActivityConfig .Groups ) == 0 {
120- sleepActivityConfig .Groups = map [string ]loadgen.SleepActivityGroupConfig {"default" : {}}
121- }
122- for name , group := range sleepActivityConfig .Groups {
123- fixedDist := loadgen .NewFixedDistribution (sleepDuration )
124- group .SleepDuration = & fixedDist
125- sleepActivityConfig .Groups [name ] = group
126- }
127-
128169 var consecutiveErrCount int
129170 errCh := make (chan error , 10000 )
130- ticker := time .NewTicker (controlInterval )
171+ ticker := time .NewTicker (config . ControlInterval )
131172 defer ticker .Stop ()
132173
133174 // Setup fairness reporting
134- fairnessTicker := time .NewTicker (fairnessReportInterval )
175+ fairnessTicker := time .NewTicker (config . FairnessReportInterval )
135176 defer fairnessTicker .Stop ()
136177 go e .fairnessReportLoop (ctx , fairnessTicker )
137178
138179 // Setup configurable backlog logging
139- backlogTicker := time .NewTicker (backlogLogInterval )
180+ backlogTicker := time .NewTicker (config . BacklogLogInterval )
140181 defer backlogTicker .Stop ()
141182
142183 var startWG sync.WaitGroup
143184 iter := 1
144185
145186 e .Logger .Infof ("Starting ebb and flow scenario: min_backlog=%d, max_backlog=%d, phase_time=%v, duration=%v" ,
146- minBacklog , maxBacklog , phaseTime , e .Configuration .Duration )
187+ config . MinBacklog , config . MaxBacklog , config . PhaseTime , e .Configuration .Duration )
147188
148189 var rate int
149190 var isDraining bool // true = draining mode, false = growing mode
@@ -158,8 +199,8 @@ func (e *ebbAndFlow) run(ctx context.Context) error {
158199 if err != nil {
159200 e .Logger .Errorf ("Failed to spawn workflow: %v" , err )
160201 consecutiveErrCount ++
161- if consecutiveErrCount >= maxConsecutiveErrors {
162- return fmt .Errorf ("got %v consecutive errors, most recent: %w" , maxConsecutiveErrors , err )
202+ if consecutiveErrCount >= config . MaxConsecutiveErrors {
203+ return fmt .Errorf ("got %v consecutive errors, most recent: %w" , config . MaxConsecutiveErrors , err )
163204 }
164205 } else {
165206 consecutiveErrCount = 0
@@ -170,24 +211,24 @@ func (e *ebbAndFlow) run(ctx context.Context) error {
170211 backlog = generated - processed
171212
172213 // Check if we need to switch modes.
173- if isDraining && backlog <= minBacklog {
214+ if isDraining && backlog <= config . MinBacklog {
174215 e .Logger .Infof ("Backlog reached %d, switching to growing mode" , backlog )
175216 isDraining = false
176217 cycleStartTime = time .Now ()
177- } else if ! isDraining && backlog >= maxBacklog {
218+ } else if ! isDraining && backlog >= config . MaxBacklog {
178219 e .Logger .Infof ("Backlog reached %d, switching to draining mode" , backlog )
179220 isDraining = true
180221 cycleStartTime = time .Now ()
181222 }
182223
183- target = calculateBacklogTarget (isDraining , cycleStartTime , phaseTime , minBacklog , maxBacklog )
184- rate = calculateSpawnRate (target , backlog , minBacklog , maxBacklog , maxRate )
224+ target = calculateBacklogTarget (isDraining , cycleStartTime , config . PhaseTime , config . MinBacklog , config . MaxBacklog )
225+ rate = calculateSpawnRate (target , backlog , config . MinBacklog , config . MaxBacklog , config . MaxRate )
185226
186227 if rate > 0 {
187228 startWG .Add (1 )
188229 go func (iteration , count int ) {
189230 defer startWG .Done ()
190- errCh <- e .spawnWorkflowWithActivities (ctx , iteration , count , sleepActivityConfig )
231+ errCh <- e .spawnWorkflowWithActivities (ctx , iteration , count , config . SleepActivityConfig )
191232 }(iter , rate )
192233 iter ++
193234 }
@@ -217,7 +258,7 @@ func (e *ebbAndFlow) run(ctx context.Context) error {
217258 EbbAndFlowScenarioIdSearchAttribute , e .id ),
218259 },
219260 completedWorkflows ,
220- visibilityVerificationTimeout ,
261+ config . VisibilityVerificationTimeout ,
221262 )
222263}
223264
0 commit comments