Skip to content

Commit dbd8ca2

Browse files
authored
Added include-retry-scenarios option to throughput_stress scenario (#229)
## What was changed Added `include-retry-scenarios` option to `throughput_stress` scenario. Setting `--option include-retry-scenarios=true` adds activities that fail and retry, as actions to `throughput_stress`. **NOTE**: enabling this option increases runtime of a single iteration from ~23s to ~44s! New activity actions were added to for this fail-and-retry coverage: - retryable activity - timeout activity - heartbeat timeout activity The added proto definitions are the same. I've separated them because they cover different activity failure cases, which I think would be neater to handle separately in the workers rather than a single failure activity. It also allows them to be expanded separately and more specifically. I've added these actions to the existing throughput stress scenario, as well as a cancellable activity action. Added to corresponding logic to each worker to handle these new activity actions. ## Why? Provide additional activity coverage, particularly in cases where the activity does not succeed on the first attempt. 2. How was this tested: Ran the existing `throughput_stress_test.go` test. Ran: ``` go run ./cmd run-scenario-with-worker \ --scenario throughput_stress \ --language go \ --iterations 10 \ --option internal-iterations=10 \ --option include-retry-scenarios=true ``` Having trouble running throughput stress on other language workers (even before this change). Will address small fixes in subsequent PRs. 3. Any docs updates needed? No
1 parent 3e8f9d6 commit dbd8ca2

File tree

19 files changed

+7078
-1804
lines changed

19 files changed

+7078
-1804
lines changed

loadgen/kitchensink/helpers.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,37 @@ func NewTimerAction(t time.Duration) *Action {
115115
}
116116
}
117117

118+
func DelayActivity(duration time.Duration, factory ActionFactory[ExecuteActivityAction]) *Action {
119+
activity := &ExecuteActivityAction{
120+
ActivityType: &ExecuteActivityAction_Delay{
121+
Delay: durationpb.New(duration),
122+
},
123+
}
124+
return factory(activity)
125+
}
126+
127+
// DelayActivityWithCancellation creates a delay activity that will be cancelled after it starts
128+
func DelayActivityWithCancellation(duration time.Duration, startToCloseTimeout time.Duration) *Action {
129+
return &Action{
130+
Variant: &Action_ExecActivity{
131+
ExecActivity: &ExecuteActivityAction{
132+
ActivityType: &ExecuteActivityAction_Delay{
133+
Delay: durationpb.New(duration),
134+
},
135+
StartToCloseTimeout: durationpb.New(startToCloseTimeout),
136+
AwaitableChoice: &AwaitableChoice{
137+
Condition: &AwaitableChoice_CancelAfterStarted{
138+
CancelAfterStarted: &emptypb.Empty{},
139+
},
140+
},
141+
Locality: &ExecuteActivityAction_Remote{
142+
Remote: &RemoteActivityOptions{},
143+
},
144+
},
145+
},
146+
}
147+
}
148+
118149
func PayloadActivity(inSize, outSize int, factory ActionFactory[ExecuteActivityAction]) *Action {
119150
activity := &ExecuteActivityAction{
120151
ActivityType: &ExecuteActivityAction_Payload{
@@ -138,6 +169,57 @@ func GenericActivity(activityType string, factory ActionFactory[ExecuteActivityA
138169
return factory(activity)
139170
}
140171

172+
func RetryableErrorActivity(failAttempts int32, factory ActionFactory[ExecuteActivityAction]) *Action {
173+
activity := &ExecuteActivityAction{
174+
ActivityType: &ExecuteActivityAction_RetryableError{
175+
RetryableError: &ExecuteActivityAction_RetryableErrorActivity{
176+
FailAttempts: failAttempts,
177+
},
178+
},
179+
}
180+
return factory(activity)
181+
}
182+
183+
func TimeoutActivity(failAttempts int32, successDuration time.Duration, failureDuration time.Duration, startToCloseTimeout time.Duration, maxAttempts int32, initialInterval time.Duration, backoffCoefficient float64) *Action {
184+
if successDuration >= startToCloseTimeout {
185+
panic(fmt.Sprintf("successDuration (%v) must be < startToCloseTimeout (%v)", successDuration, startToCloseTimeout))
186+
}
187+
if failureDuration <= startToCloseTimeout {
188+
panic(fmt.Sprintf("failureDuration (%v) must be > startToCloseTimeout (%v)", failureDuration, startToCloseTimeout))
189+
}
190+
activity := &ExecuteActivityAction{
191+
ActivityType: &ExecuteActivityAction_Timeout{
192+
Timeout: &ExecuteActivityAction_TimeoutActivity{
193+
FailAttempts: failAttempts,
194+
SuccessDuration: &durationpb.Duration{Seconds: int64(successDuration.Seconds())},
195+
FailureDuration: &durationpb.Duration{Seconds: int64(failureDuration.Seconds())},
196+
},
197+
},
198+
}
199+
factory := RemoteActivityWithRetry(startToCloseTimeout, maxAttempts, initialInterval, backoffCoefficient)
200+
return factory(activity)
201+
}
202+
203+
func HeartbeatActivity(failAttempts int32, successDuration time.Duration, failureDuration time.Duration, startToCloseTimeout, heartbeatTimeout time.Duration, maxAttempts int32, initialInterval time.Duration, backoffCoefficient float64) *Action {
204+
if successDuration >= heartbeatTimeout {
205+
panic(fmt.Sprintf("successDuration (%v) must be < heartbeatTimeout (%v)", successDuration, heartbeatTimeout))
206+
}
207+
if failureDuration <= heartbeatTimeout {
208+
panic(fmt.Sprintf("failureDuration (%v) must be > heartbeatTimeout (%v)", failureDuration, heartbeatTimeout))
209+
}
210+
activity := &ExecuteActivityAction{
211+
ActivityType: &ExecuteActivityAction_Heartbeat{
212+
Heartbeat: &ExecuteActivityAction_HeartbeatTimeoutActivity{
213+
FailAttempts: failAttempts,
214+
SuccessDuration: &durationpb.Duration{Seconds: int64(successDuration.Seconds())},
215+
FailureDuration: &durationpb.Duration{Seconds: int64(failureDuration.Seconds())},
216+
},
217+
},
218+
}
219+
factory := RemoteActivityWithHeartbeat(startToCloseTimeout, heartbeatTimeout, maxAttempts, initialInterval, backoffCoefficient)
220+
return factory(activity)
221+
}
222+
141223
func DefaultRemoteActivity(activity *ExecuteActivityAction) *Action {
142224
activity.StartToCloseTimeout = &durationpb.Duration{Seconds: 60}
143225
activity.Locality = &ExecuteActivityAction_Remote{
@@ -167,6 +249,47 @@ func DefaultLocalActivity(activity *ExecuteActivityAction) *Action {
167249
}
168250
}
169251

252+
// RemoteActivityWithRetry creates a remote activity with custom retry configuration
253+
func RemoteActivityWithRetry(startToCloseTimeout time.Duration, maxAttempts int32, initialInterval time.Duration, backoffCoefficient float64) ActionFactory[ExecuteActivityAction] {
254+
return func(activity *ExecuteActivityAction) *Action {
255+
activity.StartToCloseTimeout = durationpb.New(startToCloseTimeout)
256+
activity.RetryPolicy = &common.RetryPolicy{
257+
MaximumAttempts: maxAttempts,
258+
InitialInterval: durationpb.New(initialInterval),
259+
BackoffCoefficient: backoffCoefficient,
260+
}
261+
activity.Locality = &ExecuteActivityAction_Remote{
262+
Remote: &RemoteActivityOptions{},
263+
}
264+
return &Action{
265+
Variant: &Action_ExecActivity{
266+
ExecActivity: activity,
267+
},
268+
}
269+
}
270+
}
271+
272+
// RemoteActivityWithHeartbeat creates a remote activity with heartbeat timeout and retry configuration
273+
func RemoteActivityWithHeartbeat(startToCloseTimeout, heartbeatTimeout time.Duration, maxAttempts int32, initialInterval time.Duration, backoffCoefficient float64) ActionFactory[ExecuteActivityAction] {
274+
return func(activity *ExecuteActivityAction) *Action {
275+
activity.StartToCloseTimeout = durationpb.New(startToCloseTimeout)
276+
activity.HeartbeatTimeout = durationpb.New(heartbeatTimeout)
277+
activity.RetryPolicy = &common.RetryPolicy{
278+
MaximumAttempts: maxAttempts,
279+
InitialInterval: durationpb.New(initialInterval),
280+
BackoffCoefficient: backoffCoefficient,
281+
}
282+
activity.Locality = &ExecuteActivityAction_Remote{
283+
Remote: &RemoteActivityOptions{},
284+
}
285+
return &Action{
286+
Variant: &Action_ExecActivity{
287+
ExecActivity: activity,
288+
},
289+
}
290+
}
291+
}
292+
170293
func NewSetWorkflowStateAction(key, value string) *Action {
171294
return &Action{
172295
Variant: &Action_SetWorkflowState{

0 commit comments

Comments
 (0)