Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions loadgen/kitchensink/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,37 @@ func NewTimerAction(t time.Duration) *Action {
}
}

func DelayActivity(duration time.Duration, factory ActionFactory[ExecuteActivityAction]) *Action {
activity := &ExecuteActivityAction{
ActivityType: &ExecuteActivityAction_Delay{
Delay: durationpb.New(duration),
},
}
return factory(activity)
}

// DelayActivityWithCancellation creates a delay activity that will be cancelled after it starts
func DelayActivityWithCancellation(duration time.Duration, startToCloseTimeout time.Duration) *Action {
return &Action{
Variant: &Action_ExecActivity{
ExecActivity: &ExecuteActivityAction{
ActivityType: &ExecuteActivityAction_Delay{
Delay: durationpb.New(duration),
},
StartToCloseTimeout: durationpb.New(startToCloseTimeout),
AwaitableChoice: &AwaitableChoice{
Condition: &AwaitableChoice_CancelAfterStarted{
CancelAfterStarted: &emptypb.Empty{},
},
},
Locality: &ExecuteActivityAction_Remote{
Remote: &RemoteActivityOptions{},
},
},
},
}
}

func PayloadActivity(inSize, outSize int, factory ActionFactory[ExecuteActivityAction]) *Action {
activity := &ExecuteActivityAction{
ActivityType: &ExecuteActivityAction_Payload{
Expand All @@ -138,6 +169,57 @@ func GenericActivity(activityType string, factory ActionFactory[ExecuteActivityA
return factory(activity)
}

func RetryableErrorActivity(failAttempts int32, factory ActionFactory[ExecuteActivityAction]) *Action {
activity := &ExecuteActivityAction{
ActivityType: &ExecuteActivityAction_RetryableError{
RetryableError: &ExecuteActivityAction_RetryableErrorActivity{
FailAttempts: failAttempts,
},
},
}
return factory(activity)
}

func TimeoutActivity(failAttempts int32, successDuration time.Duration, failureDuration time.Duration, startToCloseTimeout time.Duration, maxAttempts int32, initialInterval time.Duration, backoffCoefficient float64) *Action {
if successDuration >= startToCloseTimeout {
panic(fmt.Sprintf("successDuration (%v) must be < startToCloseTimeout (%v)", successDuration, startToCloseTimeout))
}
if failureDuration <= startToCloseTimeout {
panic(fmt.Sprintf("failureDuration (%v) must be > startToCloseTimeout (%v)", failureDuration, startToCloseTimeout))
}
activity := &ExecuteActivityAction{
ActivityType: &ExecuteActivityAction_Timeout{
Timeout: &ExecuteActivityAction_TimeoutActivity{
FailAttempts: failAttempts,
SuccessDuration: &durationpb.Duration{Seconds: int64(successDuration.Seconds())},
FailureDuration: &durationpb.Duration{Seconds: int64(failureDuration.Seconds())},
},
},
}
factory := RemoteActivityWithRetry(startToCloseTimeout, maxAttempts, initialInterval, backoffCoefficient)
return factory(activity)
}

func HeartbeatActivity(failAttempts int32, successDuration time.Duration, failureDuration time.Duration, startToCloseTimeout, heartbeatTimeout time.Duration, maxAttempts int32, initialInterval time.Duration, backoffCoefficient float64) *Action {
if successDuration >= heartbeatTimeout {
panic(fmt.Sprintf("successDuration (%v) must be < heartbeatTimeout (%v)", successDuration, heartbeatTimeout))
}
if failureDuration <= heartbeatTimeout {
panic(fmt.Sprintf("failureDuration (%v) must be > heartbeatTimeout (%v)", failureDuration, heartbeatTimeout))
}
activity := &ExecuteActivityAction{
ActivityType: &ExecuteActivityAction_Heartbeat{
Heartbeat: &ExecuteActivityAction_HeartbeatTimeoutActivity{
FailAttempts: failAttempts,
SuccessDuration: &durationpb.Duration{Seconds: int64(successDuration.Seconds())},
FailureDuration: &durationpb.Duration{Seconds: int64(failureDuration.Seconds())},
},
},
}
factory := RemoteActivityWithHeartbeat(startToCloseTimeout, heartbeatTimeout, maxAttempts, initialInterval, backoffCoefficient)
return factory(activity)
}

func DefaultRemoteActivity(activity *ExecuteActivityAction) *Action {
activity.StartToCloseTimeout = &durationpb.Duration{Seconds: 60}
activity.Locality = &ExecuteActivityAction_Remote{
Expand Down Expand Up @@ -167,6 +249,47 @@ func DefaultLocalActivity(activity *ExecuteActivityAction) *Action {
}
}

// RemoteActivityWithRetry creates a remote activity with custom retry configuration
func RemoteActivityWithRetry(startToCloseTimeout time.Duration, maxAttempts int32, initialInterval time.Duration, backoffCoefficient float64) ActionFactory[ExecuteActivityAction] {
return func(activity *ExecuteActivityAction) *Action {
activity.StartToCloseTimeout = durationpb.New(startToCloseTimeout)
activity.RetryPolicy = &common.RetryPolicy{
MaximumAttempts: maxAttempts,
InitialInterval: durationpb.New(initialInterval),
BackoffCoefficient: backoffCoefficient,
}
activity.Locality = &ExecuteActivityAction_Remote{
Remote: &RemoteActivityOptions{},
}
return &Action{
Variant: &Action_ExecActivity{
ExecActivity: activity,
},
}
}
}

// RemoteActivityWithHeartbeat creates a remote activity with heartbeat timeout and retry configuration
func RemoteActivityWithHeartbeat(startToCloseTimeout, heartbeatTimeout time.Duration, maxAttempts int32, initialInterval time.Duration, backoffCoefficient float64) ActionFactory[ExecuteActivityAction] {
return func(activity *ExecuteActivityAction) *Action {
activity.StartToCloseTimeout = durationpb.New(startToCloseTimeout)
activity.HeartbeatTimeout = durationpb.New(heartbeatTimeout)
activity.RetryPolicy = &common.RetryPolicy{
MaximumAttempts: maxAttempts,
InitialInterval: durationpb.New(initialInterval),
BackoffCoefficient: backoffCoefficient,
}
activity.Locality = &ExecuteActivityAction_Remote{
Remote: &RemoteActivityOptions{},
}
return &Action{
Variant: &Action_ExecActivity{
ExecActivity: activity,
},
}
}
}

func NewSetWorkflowStateAction(key, value string) *Action {
return &Action{
Variant: &Action_SetWorkflowState{
Expand Down
Loading
Loading