Skip to content

Commit c74551e

Browse files
authored
Customizable throughputStress Workflow IDs (#134)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed <!-- Describe what has changed in this PR --> Allow to customize Workflow ID in throughputStress scenario. ## Why? <!-- Tell your future self why have you made these changes --> For our test pipeline, I need to start multiple Omes runs (since I don't want to abort the entire pipeline run if one fails). But changing the Omes RunID or Scenario is not possible, as that is used to compute the task queue (Workers are deployed separately and rely on the task queue identifier). Thus, to avoid Workflow ID conflicts, generating unique Workflow IDs is needed. ## Checklist <!--- add/delete as needed ---> 1. Closes <!-- add issue number here --> 2. How was this tested: <!--- Please describe how you tested your changes/how we can test them --> 3. Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io -->
1 parent 37b9a5e commit c74551e

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

scenarios/throughput_stress.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package scenarios
22

33
import (
4+
"cmp"
45
"context"
56
"errors"
67
"fmt"
@@ -24,9 +25,13 @@ const (
2425
SkipSleepFlag = "skip-sleep"
2526
CANEventFlag = "continue-as-new-after-event-count"
2627
NexusEndpointFlag = "nexus-endpoint"
28+
WorkflowIDPrefix = "workflow-id-prefix"
2729
)
2830

29-
const ThroughputStressScenarioIdSearchAttribute = "ThroughputStressScenarioId"
31+
const (
32+
ThroughputStressScenarioIdSearchAttribute = "ThroughputStressScenarioId"
33+
defaultWorkflowIDPrefix = "throughputStress"
34+
)
3035

3136
type tpsExecutor struct {
3237
workflowCount atomic.Uint64
@@ -80,12 +85,13 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
8085
internalIterations := run.ScenarioInfo.ScenarioOptionInt(IterFlag, 5)
8186
internalIterTimeout := run.ScenarioInfo.ScenarioOptionDuration(IterTimeout, time.Minute)
8287
continueAsNewCount := run.ScenarioInfo.ScenarioOptionInt(CANEventFlag, 120)
88+
workflowIDPrefix := cmp.Or(run.ScenarioInfo.ScenarioOptions[WorkflowIDPrefix], defaultWorkflowIDPrefix)
8389
// Disabled by default.
8490
nexusEndpoint := run.ScenarioInfo.ScenarioOptions[NexusEndpointFlag]
8591
skipSleep := run.ScenarioInfo.ScenarioOptionBool(SkipSleepFlag, false)
8692
timeout := time.Duration(internalIterations) * internalIterTimeout
8793

88-
wfID := fmt.Sprintf("throughputStress-%s-%d", run.RunID, run.Iteration)
94+
wfID := fmt.Sprintf("%s-%s-%d", workflowIDPrefix, run.RunID, run.Iteration)
8995
var result throughputstress.WorkflowOutput
9096
err := run.ExecuteAnyWorkflow(ctx,
9197
client.StartWorkflowOptions{
@@ -129,7 +135,6 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
129135
int(totalWorkflowCount),
130136
3*time.Minute,
131137
)
132-
133138
}
134139

135140
func init() {

0 commit comments

Comments
 (0)