Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/cli/run_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type scenarioRunConfig struct {
timeout time.Duration
doNotRegisterSearchAttributes bool
ignoreAlreadyStarted bool
exportFailedHistories string
}

func (r *scenarioRunner) addCLIFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -84,6 +85,8 @@ func (r *scenarioRunConfig) addCLIFlags(fs *pflag.FlagSet) {
"If the search attributes are not registed by the scenario they must be registered through some other method")
fs.BoolVar(&r.ignoreAlreadyStarted, "ignore-already-started", false,
"Ignore if a workflow with the same ID already exists. A Scenario may choose to override this behavior.")
fs.StringVar(&r.exportFailedHistories, "export-failed-histories", "", "Export failed workflow histories in JSON format. Optionally specify directory (defaults to current directory if flag specified without value)")
fs.Lookup("export-failed-histories").NoOptDefVal = "."
}

func (r *scenarioRunner) preRun() {
Expand Down Expand Up @@ -164,6 +167,9 @@ func (r *scenarioRunner) run(ctx context.Context) error {
ScenarioOptions: scenarioOptions,
Namespace: r.clientOptions.Namespace,
RootPath: repoDir,
ExportOptions: loadgen.ExportOptions{
ExportFailedHistories: r.exportFailedHistories,
},
}
executor := scenario.ExecutorFn()
err = executor.Run(ctx, scenarioInfo)
Expand Down
18 changes: 5 additions & 13 deletions cmd/cli/run_scenario_with_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,11 @@ func (r *workerWithScenarioRunner) run(ctx context.Context) error {

// Run scenario
scenarioRunner := scenarioRunner{
logger: r.Logger,
scenario: r.ScenarioID,
scenarioRunConfig: scenarioRunConfig{
iterations: r.iterations,
duration: r.duration,
maxConcurrent: r.maxConcurrent,
maxIterationsPerSecond: r.maxIterationsPerSecond,
scenarioOptions: r.scenarioOptions,
timeout: r.timeout,
doNotRegisterSearchAttributes: r.doNotRegisterSearchAttributes,
},
clientOptions: r.ClientOptions,
metricsOptions: r.metricsOptions,
logger: r.Logger,
scenario: r.ScenarioID,
scenarioRunConfig: r.scenarioRunConfig,
clientOptions: r.ClientOptions,
metricsOptions: r.metricsOptions,
}
scenarioErr := scenarioRunner.run(ctx)
cancel()
Expand Down
108 changes: 108 additions & 0 deletions loadgen/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@ package loadgen

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
)

// InitSearchAttribute ensures that a search attribute is defined in the namespace.
Expand Down Expand Up @@ -132,3 +138,105 @@ func VerifyNoFailedWorkflows(ctx context.Context, info ScenarioInfo, searchAttri
}
return nil
}

// WorkflowHistoryExport represents the exported workflow history data.
type WorkflowHistoryExport struct {
WorkflowID string `json:"workflowId"`
RunID string `json:"runId"`
Status enums.WorkflowExecutionStatus `json:"status"`
StartTime time.Time `json:"startTime"`
CloseTime time.Time `json:"closeTime"`
Events []*history.HistoryEvent `json:"events"`
}

// ExportFailedWorkflowHistories exports histories of failed/terminated workflows to JSON files.
func ExportFailedWorkflowHistories(ctx context.Context, info ScenarioInfo, searchAttribute string) error {
// Create output directory for failed histories
historiesDir := filepath.Join(info.ExportOptions.ExportFailedHistories, fmt.Sprintf("failed-histories-%s", info.RunID))
if err := os.MkdirAll(historiesDir, 0755); err != nil {
return fmt.Errorf("failed to create histories directory: %w", err)
}

// Query for failed and terminated workflows
statusQuery := fmt.Sprintf(
"%s='%s' and (ExecutionStatus = 'Failed' or ExecutionStatus = 'Terminated')",
searchAttribute, info.RunID)

// List failed/terminated workflows
resp, err := info.Client.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Namespace: info.Namespace,
Query: statusQuery,
})
if err != nil {
return fmt.Errorf("failed to list failed workflows: %w", err)
}

var exportErrors []string
totalExported := 0
// Export each workflow history
for _, execution := range resp.Executions {
if err := exportSingleWorkflowHistory(ctx, info.Client, execution, historiesDir); err != nil {
exportErrors = append(exportErrors, fmt.Sprintf("failed to export %s: %v", execution.Execution.WorkflowId, err))
} else {
totalExported++
}
}

if totalExported > 0 {
info.Logger.Infof("Exported %d failed workflow histories to %s", totalExported, historiesDir)
} else {
info.Logger.Info("No failed workflows found to export")
}

if len(exportErrors) > 0 {
return fmt.Errorf("some exports failed: %s", strings.Join(exportErrors, "; "))
}
return nil
}

// exportSingleWorkflowHistory exports a single workflow's history to a JSON file.
func exportSingleWorkflowHistory(
ctx context.Context,
c client.Client,
execution *workflow.WorkflowExecutionInfo,
outputDir string,
) error {
workflowID := execution.Execution.WorkflowId
runID := execution.Execution.RunId

// Fetch full workflow history
historyIter := c.GetWorkflowHistory(ctx, workflowID, runID, false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
var events []*history.HistoryEvent
for historyIter.HasNext() {
event, err := historyIter.Next()
if err != nil {
return fmt.Errorf("failed to read history event: %w", err)
}
events = append(events, event)
}

// Create export structure
export := WorkflowHistoryExport{
WorkflowID: workflowID,
RunID: runID,
Status: execution.Status,
StartTime: execution.StartTime.AsTime(),
CloseTime: execution.CloseTime.AsTime(),
Events: events,
}

// Write to JSON file
// Sanitize workflow ID to handle IDs with slashes (e.g., parent/child-10)
safeWorkflowID := strings.ReplaceAll(workflowID, "/", "_")
filename := filepath.Join(outputDir, fmt.Sprintf("workflow-%s-%s.json", safeWorkflowID, runID))
data, err := json.MarshalIndent(export, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal history: %w", err)
}

if err := os.WriteFile(filename, data, 0644); err != nil {
return fmt.Errorf("failed to write history file: %w", err)
}

return nil
}
9 changes: 9 additions & 0 deletions loadgen/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ type ScenarioInfo struct {
Namespace string
// Path to the root of the omes dir
RootPath string
// ExportOptions contains export-related configuration
ExportOptions ExportOptions
}

// ExportOptions contains configuration for exporting scenario data.
type ExportOptions struct {
// ExportFailedHistories is the directory to export failed workflow histories.
// Empty string means disabled.
ExportFailedHistories string
}

func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int {
Expand Down
21 changes: 17 additions & 4 deletions scenarios/throughput_stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
return errors.New("No iterations completed. Either the scenario never ran, or it failed to resume correctly.")
}

var tpsErrors []error

// Post-scenario: verify reported workflow completion count from Visibility.
if err := loadgen.MinVisibilityCountEventually(
ctx,
Expand All @@ -317,7 +319,7 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
completedWorkflows,
t.config.VisibilityVerificationTimeout,
); err != nil {
return err
tpsErrors = append(tpsErrors, err)
}

// Post-scenario: check throughput threshold
Expand All @@ -327,19 +329,30 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error
if actualThroughputPerHour < t.config.MinThroughputPerHour {
// Calculate how many workflows we expected given the duration
expectedWorkflows := int(totalDuration.Hours() * t.config.MinThroughputPerHour)

return fmt.Errorf("insufficient throughput: %.1f workflows/hour < %.1f required "+
err := fmt.Errorf("insufficient throughput: %.1f workflows/hour < %.1f required "+
"(completed %d workflows, expected %d in %v)",
actualThroughputPerHour,
t.config.MinThroughputPerHour,
completedWorkflows,
expectedWorkflows,
totalDuration.Round(time.Second))
tpsErrors = append(tpsErrors, err)
}
}

// Post-scenario: ensure there are no failed or terminated workflows for this run.
return loadgen.VerifyNoFailedWorkflows(ctx, info, ThroughputStressScenarioIdSearchAttribute, info.RunID)
if err := loadgen.VerifyNoFailedWorkflows(ctx, info, ThroughputStressScenarioIdSearchAttribute, info.RunID); err != nil {
tpsErrors = append(tpsErrors, err)
// Export failed workflow histories if requested
if info.ExportOptions.ExportFailedHistories != "" {
info.Logger.Warn("Failed workflows detected, exporting histories...")
if exportErr := loadgen.ExportFailedWorkflowHistories(ctx, info, ThroughputStressScenarioIdSearchAttribute); exportErr != nil {
info.Logger.Errorf("Failed to export workflow histories: %v", exportErr)
}
}
}

return errors.Join(tpsErrors...)
}

func (t *tpsExecutor) verifyFirstRun(ctx context.Context, info loadgen.ScenarioInfo, skipCleanNamespaceCheck bool) error {
Expand Down
Loading