Skip to content

Commit 8033c3a

Browse files
authored
Remove scenario name from task queue name (#197)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed <!-- Describe what has changed in this PR --> Omes client and worker get their task queue _only_ from the RunID now; instead of also the scenario name. ## Why? <!-- Tell your future self why have you made these changes --> RunID is already expected to be a unique identifier; scenario name is superfluous. When running multiple scenarios, the user can pick a unique, clear RunID to distinguish them. This also simplifies the CICD use case where the workers need to be deployed first before the clients are run. That means the name(s) of the scenario(s) need to known and propagated ahead of time. Now all that's needed is a RunID. ## Checklist <!--- add/delete as needed ---> 1. Closes <!-- add issue number here --> 2. How was this tested: <!--- Please describe how you tested your changes/how we can test them --> 3. Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io -->
1 parent 669a545 commit 8033c3a

File tree

8 files changed

+23
-28
lines changed

8 files changed

+23
-28
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ Notes:
101101
### Run a worker for a specific language SDK
102102

103103
```sh
104-
go run ./cmd run-worker --scenario workflow_with_single_noop_activity --run-id local-test-run --language go
104+
go run ./cmd run-worker --run-id local-test-run --language go
105105
```
106106

107107
Notes:

cmd/cleanup_scenario.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,15 @@ func cleanupScenarioCmd() *cobra.Command {
3939

4040
type scenarioCleaner struct {
4141
logger *zap.SugaredLogger
42-
scenario string
43-
runID string
4442
pollInterval time.Duration
43+
scenario cmdoptions.ScenarioID
4544
clientOptions cmdoptions.ClientOptions
4645
metricsOptions cmdoptions.MetricsOptions
4746
loggingOptions cmdoptions.LoggingOptions
4847
}
4948

5049
func (c *scenarioCleaner) addCLIFlags(fs *pflag.FlagSet) {
51-
fs.StringVar(&c.scenario, "scenario", "", "Scenario name to cleanup")
52-
fs.StringVar(&c.runID, "run-id", "", "Run ID for the run")
50+
c.scenario.AddCLIFlags(fs)
5351
fs.DurationVar(&c.pollInterval, "poll-interval", time.Second, "Interval for polling completion of job")
5452
c.clientOptions.AddCLIFlags(fs)
5553
c.metricsOptions.AddCLIFlags(fs, "")
@@ -58,17 +56,17 @@ func (c *scenarioCleaner) addCLIFlags(fs *pflag.FlagSet) {
5856

5957
func (c *scenarioCleaner) run(ctx context.Context) error {
6058
c.logger = c.loggingOptions.MustCreateLogger()
61-
scenario := loadgen.GetScenario(c.scenario)
59+
scenario := loadgen.GetScenario(c.scenario.Scenario)
6260
if scenario == nil {
6361
return fmt.Errorf("scenario not found")
64-
} else if c.runID == "" {
62+
} else if c.scenario.RunID == "" {
6563
return fmt.Errorf("run ID not found")
6664
}
6765
metrics := c.metricsOptions.MustCreateMetrics(c.logger)
6866
defer metrics.Shutdown(ctx)
6967
client := c.clientOptions.MustDial(metrics, c.logger)
7068
defer client.Close()
71-
taskQueue := loadgen.TaskQueueForRun(c.scenario, c.runID)
69+
taskQueue := loadgen.TaskQueueForRun(c.scenario.RunID)
7270
jobID := "omes-cleanup-" + taskQueue + "-" + uuid.New().String()
7371
username, hostname := "anonymous", "unknown"
7472
if user, err := user.Current(); err == nil {

cmd/run_worker.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ func runWorkerCmd() *cobra.Command {
2929
},
3030
}
3131
r.addCLIFlags(cmd.Flags())
32-
cmd.MarkFlagRequired("scenario")
3332
cmd.MarkFlagRequired("language")
3433
cmd.MarkFlagRequired("run-id")
3534
return cmd
@@ -59,7 +58,7 @@ func (r *workerRunner) addCLIFlags(fs *pflag.FlagSet) {
5958
func (r *workerRunner) preRun() {
6059
r.builder.preRun()
6160
r.Runner.Builder = r.builder.Builder
62-
r.TaskQueueName = loadgen.TaskQueueForRun(r.ScenarioID.Scenario, r.ScenarioID.RunID)
61+
r.TaskQueueName = loadgen.TaskQueueForRun(r.ScenarioID.RunID)
6362
}
6463

6564
func (r *workerRunner) run(ctx context.Context) error {

loadgen/kitchen_sink_executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ func testSupportedFeature(
833833
}
834834
execErr := env.RunExecutorTest(t, testExecutor, scenarioInfo, sdk)
835835

836-
taskQueueName := TaskQueueForRun(scenarioInfo.ScenarioName, scenarioInfo.RunID)
836+
taskQueueName := TaskQueueForRun(scenarioInfo.RunID)
837837
historyEvents, historyErr := getWorkflowHistory(t, taskQueueName, env.TemporalClient())
838838
if execErr != nil {
839839
if len(historyEvents) > 0 {

loadgen/scenario.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -291,19 +291,19 @@ func (s *ScenarioInfo) RegisterDefaultSearchAttributes(ctx context.Context) erro
291291
return nil
292292
}
293293

294-
// TaskQueueForRun returns a default task queue name for the given scenario name and run ID.
295-
func TaskQueueForRun(scenarioName, runID string) string {
296-
return fmt.Sprintf("%s:%s", scenarioName, runID)
294+
// TaskQueueForRun returns the task queue name for the given run ID.
295+
func TaskQueueForRun(runID string) string {
296+
return "omes-" + runID
297297
}
298298

299299
func (r *Run) TaskQueue() string {
300-
return TaskQueueForRun(r.ScenarioName, r.RunID)
300+
return TaskQueueForRun(r.RunID)
301301
}
302302

303303
// DefaultStartWorkflowOptions gets default start workflow info.
304304
func (r *Run) DefaultStartWorkflowOptions() client.StartWorkflowOptions {
305305
return client.StartWorkflowOptions{
306-
TaskQueue: TaskQueueForRun(r.ScenarioName, r.RunID),
306+
TaskQueue: TaskQueueForRun(r.RunID),
307307
ID: fmt.Sprintf("w-%s-%d", r.RunID, r.Iteration),
308308
WorkflowExecutionErrorWhenAlreadyStarted: true,
309309
}

scenarios/state_transitions_steady.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (s *stateTransitionsSteady) run(ctx context.Context) error {
134134
&workflowservice.CountWorkflowExecutionsRequest{
135135
Namespace: s.Namespace,
136136
Query: fmt.Sprintf("TaskQueue = %q and ExecutionStatus = 'Running'",
137-
loadgen.TaskQueueForRun(s.ScenarioName, s.RunID)),
137+
loadgen.TaskQueueForRun(s.RunID)),
138138
},
139139
0,
140140
time.Minute,

scenarios/throughput_stress_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestThroughputStress(t *testing.T) {
1616

1717
scenarioName := "throughput_stress_test"
1818
runID := fmt.Sprintf("tps-%d", time.Now().Unix())
19-
taskQueueName := loadgen.TaskQueueForRun(scenarioName, runID)
19+
taskQueueName := loadgen.TaskQueueForRun(runID)
2020

2121
env := workers.SetupTestEnvironment(t,
2222
workers.WithExecutorTimeout(1*time.Minute),

workers/test_env.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -178,23 +178,18 @@ func (env *TestEnvironment) RunExecutorTest(
178178
) error {
179179
env.ensureWorkerBuilt(t, sdk)
180180

181-
scenarioID := cmdoptions.ScenarioID{
182-
Scenario: scenarioInfo.ScenarioName,
183-
RunID: scenarioInfo.RunID,
184-
}
185-
taskQueueName := loadgen.TaskQueueForRun(scenarioID.Scenario, scenarioID.RunID)
186-
187181
testCtx, cancelTestCtx := context.WithTimeout(t.Context(), env.executorTimeout)
188182
defer cancelTestCtx()
189183

190-
workerDone := env.startWorker(testCtx, sdk, taskQueueName, scenarioID)
191-
192184
// Update scenario info with test environment details
193185
scenarioInfo.Logger = env.logger.Named("executor")
194186
scenarioInfo.MetricsHandler = client.MetricsNopHandler
195187
scenarioInfo.Client = env.temporalClient
196188
scenarioInfo.Namespace = testNamespace
197189

190+
taskQueueName := loadgen.TaskQueueForRun(scenarioInfo.RunID)
191+
workerDone := env.startWorker(testCtx, sdk, taskQueueName, scenarioInfo)
192+
198193
err := executor.Run(testCtx, scenarioInfo)
199194

200195
// Trigger worker shutdown.
@@ -262,7 +257,7 @@ func (env *TestEnvironment) startWorker(
262257
ctx context.Context,
263258
sdk cmdoptions.Language,
264259
taskQueueName string,
265-
scenarioID cmdoptions.ScenarioID,
260+
scenarioInfo loadgen.ScenarioInfo,
266261
) <-chan error {
267262
workerDone := make(chan error, 1)
268263

@@ -277,7 +272,10 @@ func (env *TestEnvironment) startWorker(
277272
},
278273
TaskQueueName: taskQueueName,
279274
GracefulShutdownDuration: workerShutdownTimeout,
280-
ScenarioID: scenarioID,
275+
ScenarioID: cmdoptions.ScenarioID{
276+
Scenario: scenarioInfo.ScenarioName,
277+
RunID: scenarioInfo.RunID,
278+
},
281279
ClientOptions: cmdoptions.ClientOptions{
282280
Address: env.DevServerAddress(),
283281
Namespace: testNamespace,

0 commit comments

Comments
 (0)