Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/cli/run_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type scenarioRunConfig struct {
timeout time.Duration
doNotRegisterSearchAttributes bool
ignoreAlreadyStarted bool
exportFailedHistories string
exportScenarioMetrics string
}

func (r *scenarioRunner) addCLIFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -84,6 +86,11 @@ func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) {
"If the search attributes are not registed by the scenario they must be registered through some other method")
fs.BoolVar(&r.ignoreAlreadyStarted, "ignore-already-started", false,
"Ignore if a workflow with the same ID already exists. A Scenario may choose to override this behavior.")

fs.StringVar(&r.exportFailedHistories, "export-failed-histories", "", "Export failed workflow histories in JSON format. Optionally specify directory (defaults to current directory if flag specified without value)")
fs.Lookup("export-failed-histories").NoOptDefVal = "."
fs.StringVar(&r.exportScenarioMetrics, "export-scenario-metrics", "", "Export scenario metrics in JSON format. Optionally specify directory (defaults to current directory if flag specified without value)")
fs.Lookup("export-scenario-metrics").NoOptDefVal = "."
}

func (r *scenarioRunner) preRun() {
Expand Down Expand Up @@ -164,6 +171,10 @@ func (r *scenarioRunner) run(ctx context.Context) error {
ScenarioOptions: scenarioOptions,
Namespace: r.clientOptions.Namespace,
RootPath: repoDir,
ExportOptions: loadgen.ExportOptions{
ExportFailedHistories: r.exportFailedHistories,
ExportMetrics: r.exportScenarioMetrics,
},
}
executor := scenario.ExecutorFn()
err = executor.Run(ctx, scenarioInfo)
Expand Down
18 changes: 5 additions & 13 deletions cmd/cli/run_scenario_with_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,11 @@ func (r *workerWithScenarioRunner) run(ctx context.Context) error {

// Run scenario
scenarioRunner := scenarioRunner{
logger: r.Logger,
scenario: r.ScenarioID,
scenarioRunConfig: scenarioRunConfig{
iterations: r.iterations,
duration: r.duration,
maxConcurrent: r.maxConcurrent,
maxIterationsPerSecond: r.maxIterationsPerSecond,
scenarioOptions: r.scenarioOptions,
timeout: r.timeout,
doNotRegisterSearchAttributes: r.doNotRegisterSearchAttributes,
},
clientOptions: r.ClientOptions,
metricsOptions: r.metricsOptions,
logger: r.Logger,
scenario: r.ScenarioID,
scenarioRunConfig: r.scenarioRunConfig,
clientOptions: r.ClientOptions,
metricsOptions: r.metricsOptions,
}
scenarioErr := scenarioRunner.run(ctx)
cancel()
Expand Down
3 changes: 3 additions & 0 deletions cmd/clioptions/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type WorkerOptions struct {
MaxConcurrentWorkflowTasks int
WorkerActivitiesPerSecond float64
ErrOnUnimplemented bool
ExportMetrics string

fs *pflag.FlagSet
usedPrefix string
Expand All @@ -39,5 +40,7 @@ func (m *WorkerOptions) FlagSet(prefix string) *pflag.FlagSet {
m.fs.IntVar(&m.WorkflowPollerAutoscaleMax, prefix+"workflow-poller-autoscale-max", 0, "Max for workflow poller autoscaling (overrides max-concurrent-workflow-pollers")
m.fs.Float64Var(&m.WorkerActivitiesPerSecond, prefix+"worker-activities-per-second", 0, "Per-worker activity rate limit")
m.fs.BoolVar(&m.ErrOnUnimplemented, prefix+"err-on-unimplemented", false, "Fail on unimplemented actions (currently this only applies to concurrent client actions)")
m.fs.StringVar(&m.ExportMetrics, prefix+"export-metrics", "", "Export worker resource metrics (CPU/memory) in JSON format. Optionally specify directory (defaults to current directory if flag specified without value)")
m.fs.Lookup(prefix + "export-metrics").NoOptDefVal = "."
return m.fs
}
106 changes: 106 additions & 0 deletions loadgen/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@ package loadgen

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
)

// InitSearchAttribute ensures that a search attribute is defined in the namespace.
Expand Down Expand Up @@ -132,3 +138,103 @@ func VerifyNoFailedWorkflows(ctx context.Context, info ScenarioInfo, searchAttri
}
return nil
}

// WorkflowHistoryExport represents the exported workflow history data.
type WorkflowHistoryExport struct {
WorkflowID string `json:"workflowId"`
RunID string `json:"runId"`
Status enums.WorkflowExecutionStatus `json:"status"`
StartTime time.Time `json:"startTime"`
CloseTime time.Time `json:"closeTime"`
Events []*history.HistoryEvent `json:"events"`
}

// ExportFailedWorkflowHistories exports histories of failed/terminated workflows to JSON files.
func ExportFailedWorkflowHistories(ctx context.Context, info ScenarioInfo, searchAttribute, runID, outputDir string) error {
// Create output directory for failed histories
historiesDir := filepath.Join(outputDir, fmt.Sprintf("failed-histories-%s", runID))
if err := os.MkdirAll(historiesDir, 0755); err != nil {
return fmt.Errorf("failed to create histories directory: %w", err)
}

// Query for failed and terminated workflows
statusQuery := fmt.Sprintf(
"%s='%s' and (ExecutionStatus = 'Failed' or ExecutionStatus = 'Terminated')",
searchAttribute, runID)

// List failed/terminated workflows
resp, err := info.Client.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Namespace: info.Namespace,
Query: statusQuery,
})
if err != nil {
return fmt.Errorf("failed to list failed workflows: %w", err)
}

var exportErrors []string
totalExported := 0
// Export each workflow history
for _, execution := range resp.Executions {
if err := exportSingleWorkflowHistory(ctx, info.Client, execution, historiesDir); err != nil {
exportErrors = append(exportErrors, fmt.Sprintf("failed to export %s: %v", execution.Execution.WorkflowId, err))
} else {
totalExported++
}
}

if totalExported > 0 {
info.Logger.Infof("Exported %d failed workflow histories to %s", totalExported, historiesDir)
} else {
info.Logger.Info("No failed workflows found to export")
}

if len(exportErrors) > 0 {
return fmt.Errorf("some exports failed: %s", strings.Join(exportErrors, "; "))
}
return nil
}

// exportSingleWorkflowHistory exports a single workflow's history to a JSON file.
func exportSingleWorkflowHistory(
ctx context.Context,
c client.Client,
execution *workflow.WorkflowExecutionInfo,
outputDir string,
) error {
workflowID := execution.Execution.WorkflowId
runID := execution.Execution.RunId

// Fetch full workflow history
historyIter := c.GetWorkflowHistory(ctx, workflowID, runID, false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
var events []*history.HistoryEvent
for historyIter.HasNext() {
event, err := historyIter.Next()
if err != nil {
return fmt.Errorf("failed to read history event: %w", err)
}
events = append(events, event)
}

// Create export structure
export := WorkflowHistoryExport{
WorkflowID: workflowID,
RunID: runID,
Status: execution.Status,
StartTime: execution.StartTime.AsTime(),
CloseTime: execution.CloseTime.AsTime(),
Events: events,
}

// Write to JSON file
filename := filepath.Join(outputDir, fmt.Sprintf("workflow-%s-%s.json", workflowID, runID))
data, err := json.MarshalIndent(export, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal history: %w", err)
}

if err := os.WriteFile(filename, data, 0644); err != nil {
return fmt.Errorf("failed to write history file: %w", err)
}

return nil
}
12 changes: 12 additions & 0 deletions loadgen/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ type ScenarioInfo struct {
Namespace string
// Path to the root of the omes dir
RootPath string
// ExportOptions contains export-related configuration
ExportOptions ExportOptions
}

// ExportOptions contains configuration for exporting scenario data.
type ExportOptions struct {
// ExportFailedHistories is the directory to export failed workflow histories.
// Empty string means disabled.
ExportFailedHistories string
// ExportMetrics is the directory to export scenario metrics.
// Empty string means disabled.
ExportMetrics string
}

func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int {
Expand Down
68 changes: 65 additions & 3 deletions scenarios/throughput_stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package scenarios
import (
"cmp"
"context"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"math/rand"
"os"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -299,6 +302,8 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
return errors.New("No iterations completed. Either the scenario never ran, or it failed to resume correctly.")
}

var tpsErrors []error

// Post-scenario: verify reported workflow completion count from Visibility.
if err := loadgen.MinVisibilityCountEventually(
ctx,
Expand All @@ -311,7 +316,7 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
completedWorkflows,
t.config.VisibilityVerificationTimeout,
); err != nil {
return err
tpsErrors = append(tpsErrors, err)
}

// Post-scenario: check throughput threshold
Expand All @@ -322,18 +327,37 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
// Calculate how many workflows we expected given the duration
expectedWorkflows := int(totalDuration.Hours() * t.config.MinThroughputPerHour)

return fmt.Errorf("insufficient throughput: %.1f workflows/hour < %.1f required "+
err := fmt.Errorf("insufficient throughput: %.1f workflows/hour < %.1f required "+
"(completed %d workflows, expected %d in %v)",
actualThroughputPerHour,
t.config.MinThroughputPerHour,
completedWorkflows,
expectedWorkflows,
totalDuration.Round(time.Second))
tpsErrors = append(tpsErrors, err)
}
}

// Post-scenario: ensure there are no failed or terminated workflows for this run.
return loadgen.VerifyNoFailedWorkflows(ctx, info, ThroughputStressScenarioIdSearchAttribute, info.RunID)
if err := loadgen.VerifyNoFailedWorkflows(ctx, info, ThroughputStressScenarioIdSearchAttribute, info.RunID); err != nil {
tpsErrors = append(tpsErrors, err)
// Export failed workflow histories if requested
if info.ExportOptions.ExportFailedHistories != "" {
info.Logger.Warn("Failed workflows detected, exporting histories...")
if exportErr := loadgen.ExportFailedWorkflowHistories(ctx, info, ThroughputStressScenarioIdSearchAttribute, info.RunID, info.ExportOptions.ExportFailedHistories); exportErr != nil {
info.Logger.Errorf("Failed to export workflow histories: %v", exportErr)
}
}
}

// Export scenario metrics if requested
if info.ExportOptions.ExportMetrics != "" {
if err := t.exportMetrics(info, completedIterations, completedWorkflows, totalDuration); err != nil {
info.Logger.Errorf("failed to export metrics: %w", err)
}
}

return errors.Join(tpsErrors...)
}

func (t *tpsExecutor) verifyFirstRun(ctx context.Context, info loadgen.ScenarioInfo, skipCleanNamespaceCheck bool) error {
Expand Down Expand Up @@ -651,3 +675,41 @@ func (t *tpsExecutor) maybeWithStart(likelihood float64) bool {
defer t.lock.Unlock()
return t.rng.Float64() <= likelihood
}

// exportMetrics exports scenario metrics to a JSON file.
func (t *tpsExecutor) exportMetrics(info loadgen.ScenarioInfo, completedIterations, completedWorkflows int, totalDuration time.Duration) error {
throughputPerSecond := float64(completedWorkflows) / totalDuration.Seconds()

metrics := map[string]interface{}{
"scenario": "throughput_stress",
"runId": info.RunID,
"startTime": time.Now().Add(-totalDuration).Format(time.RFC3339),
"endTime": time.Now().Format(time.RFC3339),
"duration": totalDuration.String(),
"throughputPerSecond": throughputPerSecond,
"configuration": map[string]interface{}{
"iterations": info.Configuration.Iterations,
"maxConcurrent": info.Configuration.MaxConcurrent,
"internalIterations": t.config.InternalIterations,
"continueAsNewAfter": t.config.ContinueAsNewAfterIter,
},
"results": map[string]interface{}{
"completedIterations": completedIterations,
"completedWorkflows": completedWorkflows,
},
}

// Write to file
filename := filepath.Join(info.ExportOptions.ExportMetrics, fmt.Sprintf("scenario-metrics-%s.json", info.RunID))
data, err := json.MarshalIndent(metrics, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal metrics: %w", err)
}

if err := os.WriteFile(filename, data, 0644); err != nil {
return fmt.Errorf("failed to write metrics file: %w", err)
}

info.Logger.Infof("Scenario metrics exported to %s", filename)
return nil
}
5 changes: 5 additions & 0 deletions workers/dotnet/App.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public static class App
description: "The namespace to use",
getDefaultValue: () => "default");

private static readonly Option<string> runIdOption = new(
name: "--run-id",
description: "Run ID");

private static readonly Option<uint> taskQSuffixStartOption = new(
name: "--task-queue-suffix-index-start",
description: "Inclusive start for task queue suffix range");
Expand Down Expand Up @@ -112,6 +116,7 @@ private static Command CreateCommand()
cmd.Add(serverOption);
cmd.Add(taskQueueOption);
cmd.Add(namespaceOption);
cmd.Add(runIdOption);
cmd.Add(taskQSuffixStartOption);
cmd.Add(taskQSuffixEndOption);
cmd.Add(maxATPollersOption);
Expand Down
8 changes: 8 additions & 0 deletions workers/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require github.com/temporalio/omes v1.0.0

require (
github.com/nexus-rpc/sdk-go v0.4.0
github.com/shirou/gopsutil/v3 v3.24.5
github.com/spf13/cobra v1.8.0
go.temporal.io/api v1.53.0
go.temporal.io/sdk v1.37.0
Expand All @@ -17,22 +18,29 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/stretchr/testify v1.10.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.44.0 // indirect
golang.org/x/sync v0.17.0 // indirect
Expand Down
Loading
Loading