diff --git a/cmd/cli/run_scenario.go b/cmd/cli/run_scenario.go index eee8d1b0..a4ca6e2f 100644 --- a/cmd/cli/run_scenario.go +++ b/cmd/cli/run_scenario.go @@ -57,6 +57,7 @@ type scenarioRunConfig struct { timeout time.Duration doNotRegisterSearchAttributes bool ignoreAlreadyStarted bool + exportFailedHistories string } func (r *scenarioRunner) addCLIFlags(fs *pflag.FlagSet) { @@ -84,6 +85,8 @@ func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) { "If the search attributes are not registed by the scenario they must be registered through some other method") fs.BoolVar(&r.ignoreAlreadyStarted, "ignore-already-started", false, "Ignore if a workflow with the same ID already exists. A Scenario may choose to override this behavior.") + fs.StringVar(&r.exportFailedHistories, "export-failed-histories", "", "Export failed workflow histories in JSON format. Optionally specify directory (defaults to current directory if flag specified without value)") + fs.Lookup("export-failed-histories").NoOptDefVal = "." } func (r *scenarioRunner) preRun() { @@ -164,6 +167,9 @@ func (r *scenarioRunner) run(ctx context.Context) error { ScenarioOptions: scenarioOptions, Namespace: r.clientOptions.Namespace, RootPath: repoDir, + ExportOptions: loadgen.ExportOptions{ + ExportFailedHistories: r.exportFailedHistories, + }, } executor := scenario.ExecutorFn() err = executor.Run(ctx, scenarioInfo) diff --git a/cmd/cli/run_scenario_with_worker.go b/cmd/cli/run_scenario_with_worker.go index 327e6485..98d8055b 100644 --- a/cmd/cli/run_scenario_with_worker.go +++ b/cmd/cli/run_scenario_with_worker.go @@ -71,19 +71,11 @@ func (r *workerWithScenarioRunner) run(ctx context.Context) error { // Run scenario scenarioRunner := scenarioRunner{ - logger: r.Logger, - scenario: r.ScenarioID, - scenarioRunConfig: scenarioRunConfig{ - iterations: r.iterations, - duration: r.duration, - maxConcurrent: r.maxConcurrent, - maxIterationsPerSecond: r.maxIterationsPerSecond, - scenarioOptions: r.scenarioOptions, - timeout: r.timeout, - doNotRegisterSearchAttributes: r.doNotRegisterSearchAttributes, - }, - clientOptions: r.ClientOptions, - metricsOptions: r.metricsOptions, + logger: r.Logger, + scenario: r.ScenarioID, + scenarioRunConfig: r.scenarioRunConfig, + clientOptions: r.ClientOptions, + metricsOptions: r.metricsOptions, } scenarioErr := scenarioRunner.run(ctx) cancel() diff --git a/loadgen/helpers.go b/loadgen/helpers.go index da1d9e64..f7ba9cbd 100644 --- a/loadgen/helpers.go +++ b/loadgen/helpers.go @@ -2,15 +2,21 @@ package loadgen import ( "context" + "encoding/json" "errors" "fmt" + "os" + "path/filepath" "strings" "time" "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" ) // InitSearchAttribute ensures that a search attribute is defined in the namespace. @@ -132,3 +138,105 @@ func VerifyNoFailedWorkflows(ctx context.Context, info ScenarioInfo, searchAttri } return nil } + +// WorkflowHistoryExport represents the exported workflow history data. +type WorkflowHistoryExport struct { + WorkflowID string `json:"workflowId"` + RunID string `json:"runId"` + Status enums.WorkflowExecutionStatus `json:"status"` + StartTime time.Time `json:"startTime"` + CloseTime time.Time `json:"closeTime"` + Events []*history.HistoryEvent `json:"events"` +} + +// ExportFailedWorkflowHistories exports histories of failed/terminated workflows to JSON files. +func ExportFailedWorkflowHistories(ctx context.Context, info ScenarioInfo, searchAttribute string) error { + // Create output directory for failed histories + historiesDir := filepath.Join(info.ExportOptions.ExportFailedHistories, fmt.Sprintf("failed-histories-%s", info.RunID)) + if err := os.MkdirAll(historiesDir, 0755); err != nil { + return fmt.Errorf("failed to create histories directory: %w", err) + } + + // Query for failed and terminated workflows + statusQuery := fmt.Sprintf( + "%s='%s' and (ExecutionStatus = 'Failed' or ExecutionStatus = 'Terminated')", + searchAttribute, info.RunID) + + // List failed/terminated workflows + resp, err := info.Client.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: info.Namespace, + Query: statusQuery, + }) + if err != nil { + return fmt.Errorf("failed to list failed workflows: %w", err) + } + + var exportErrors []string + totalExported := 0 + // Export each workflow history + for _, execution := range resp.Executions { + if err := exportSingleWorkflowHistory(ctx, info.Client, execution, historiesDir); err != nil { + exportErrors = append(exportErrors, fmt.Sprintf("failed to export %s: %v", execution.Execution.WorkflowId, err)) + } else { + totalExported++ + } + } + + if totalExported > 0 { + info.Logger.Infof("Exported %d failed workflow histories to %s", totalExported, historiesDir) + } else { + info.Logger.Info("No failed workflows found to export") + } + + if len(exportErrors) > 0 { + return fmt.Errorf("some exports failed: %s", strings.Join(exportErrors, "; ")) + } + return nil +} + +// exportSingleWorkflowHistory exports a single workflow's history to a JSON file. +func exportSingleWorkflowHistory( + ctx context.Context, + c client.Client, + execution *workflow.WorkflowExecutionInfo, + outputDir string, +) error { + workflowID := execution.Execution.WorkflowId + runID := execution.Execution.RunId + + // Fetch full workflow history + historyIter := c.GetWorkflowHistory(ctx, workflowID, runID, false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + var events []*history.HistoryEvent + for historyIter.HasNext() { + event, err := historyIter.Next() + if err != nil { + return fmt.Errorf("failed to read history event: %w", err) + } + events = append(events, event) + } + + // Create export structure + export := WorkflowHistoryExport{ + WorkflowID: workflowID, + RunID: runID, + Status: execution.Status, + StartTime: execution.StartTime.AsTime(), + CloseTime: execution.CloseTime.AsTime(), + Events: events, + } + + // Write to JSON file + // Sanitize workflow ID to handle IDs with slashes (e.g., parent/child-10) + safeWorkflowID := strings.ReplaceAll(workflowID, "/", "_") + filename := filepath.Join(outputDir, fmt.Sprintf("workflow-%s-%s.json", safeWorkflowID, runID)) + data, err := json.MarshalIndent(export, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal history: %w", err) + } + + if err := os.WriteFile(filename, data, 0644); err != nil { + return fmt.Errorf("failed to write history file: %w", err) + } + + return nil +} diff --git a/loadgen/scenario.go b/loadgen/scenario.go index 800bf550..d2774a2f 100644 --- a/loadgen/scenario.go +++ b/loadgen/scenario.go @@ -118,6 +118,15 @@ type ScenarioInfo struct { Namespace string // Path to the root of the omes dir RootPath string + // ExportOptions contains export-related configuration + ExportOptions ExportOptions +} + +// ExportOptions contains configuration for exporting scenario data. +type ExportOptions struct { + // ExportFailedHistories is the directory to export failed workflow histories. + // Empty string means disabled. + ExportFailedHistories string } func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int { diff --git a/scenarios/throughput_stress.go b/scenarios/throughput_stress.go index a4944463..3d06b176 100644 --- a/scenarios/throughput_stress.go +++ b/scenarios/throughput_stress.go @@ -305,6 +305,8 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error return errors.New("No iterations completed. Either the scenario never ran, or it failed to resume correctly.") } + var tpsErrors []error + // Post-scenario: verify reported workflow completion count from Visibility. if err := loadgen.MinVisibilityCountEventually( ctx, @@ -317,7 +319,7 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error completedWorkflows, t.config.VisibilityVerificationTimeout, ); err != nil { - return err + tpsErrors = append(tpsErrors, err) } // Post-scenario: check throughput threshold @@ -327,19 +329,30 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error if actualThroughputPerHour < t.config.MinThroughputPerHour { // Calculate how many workflows we expected given the duration expectedWorkflows := int(totalDuration.Hours() * t.config.MinThroughputPerHour) - - return fmt.Errorf("insufficient throughput: %.1f workflows/hour < %.1f required "+ + err := fmt.Errorf("insufficient throughput: %.1f workflows/hour < %.1f required "+ "(completed %d workflows, expected %d in %v)", actualThroughputPerHour, t.config.MinThroughputPerHour, completedWorkflows, expectedWorkflows, totalDuration.Round(time.Second)) + tpsErrors = append(tpsErrors, err) } } // Post-scenario: ensure there are no failed or terminated workflows for this run. - return loadgen.VerifyNoFailedWorkflows(ctx, info, ThroughputStressScenarioIdSearchAttribute, info.RunID) + if err := loadgen.VerifyNoFailedWorkflows(ctx, info, ThroughputStressScenarioIdSearchAttribute, info.RunID); err != nil { + tpsErrors = append(tpsErrors, err) + // Export failed workflow histories if requested + if info.ExportOptions.ExportFailedHistories != "" { + info.Logger.Warn("Failed workflows detected, exporting histories...") + if exportErr := loadgen.ExportFailedWorkflowHistories(ctx, info, ThroughputStressScenarioIdSearchAttribute); exportErr != nil { + info.Logger.Errorf("Failed to export workflow histories: %v", exportErr) + } + } + } + + return errors.Join(tpsErrors...) } func (t *tpsExecutor) verifyFirstRun(ctx context.Context, info loadgen.ScenarioInfo, skipCleanNamespaceCheck bool) error {