From 592ee81c8b90b8d98f07aec9b33560f8809cc615 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Mon, 4 Aug 2025 14:46:37 -0600 Subject: [PATCH 01/13] improvement: remove waits before fetching activities --- service/worker/batcher/activities.go | 566 +++++++++++++++++++++++---- service/worker/batcher/workflow.go | 13 +- 2 files changed, 511 insertions(+), 68 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index 1181adcdd69..56ac82850e5 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -41,6 +41,197 @@ var ( errNamespaceMismatch = errors.New("namespace mismatch") ) +// batchProcessorConfig holds the configuration for batch processing +type batchProcessorConfig struct { + namespace string + adjustedQuery string + rps float64 + concurrency int + initialPageToken []byte + initialExecutions []*commonpb.WorkflowExecution +} + +// batchWorkerProcessor defines the interface for different worker processor types +type batchWorkerProcessor func( + ctx context.Context, + taskCh chan taskDetail, + respCh chan taskResponse, + rateLimiter *rate.Limiter, + sdkClient sdkclient.Client, + frontendClient workflowservice.WorkflowServiceClient, + metricsHandler metrics.Handler, + logger log.Logger, +) + +// processWorkflowsWithProactiveFetching handles the core logic for both batch activity functions +// nolint:revive,cognitive-complexity +func (a *activities) processWorkflowsWithProactiveFetching( + ctx context.Context, + config batchProcessorConfig, + startWorkerProcessor batchWorkerProcessor, + sdkClient sdkclient.Client, + metricsHandler metrics.Handler, + logger log.Logger, + hbd HeartBeatDetails, +) (HeartBeatDetails, error) { + rateLimit := rate.Limit(config.rps) + burstLimit := int(math.Ceil(config.rps)) // should never be zero because everything would be rejected + rateLimiter := rate.NewLimiter(rateLimit, burstLimit) + taskCh := make(chan taskDetail, pageSize) + respCh := make(chan taskResponse, pageSize) + + // Start worker processors + for i := 0; i < config.concurrency; i++ { + go startWorkerProcessor(ctx, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) + } + + // Track worker state for proactive page fetching + activeWorkers := 0 + totalTasksSent := 0 + totalTasksCompleted := 0 + currentPageToken := config.initialPageToken + hasMorePages := true + + // Track pending tasks per page for proper heartbeating + currentPageNumber := hbd.CurrentPage + pendingTasksPerPage := make(map[int]int) + + // Initial page processing + executions := config.initialExecutions + if len(config.adjustedQuery) > 0 { + resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + PageSize: int32(pageSize), + NextPageToken: currentPageToken, + Query: config.adjustedQuery, + }) + if err != nil { + metrics.BatcherOperationFailures.With(metricsHandler).Record(1) + logger.Error("Failed to list workflow executions", tag.Error(err)) + return HeartBeatDetails{}, err + } + currentPageToken = resp.NextPageToken + hasMorePages = len(currentPageToken) > 0 + for _, wf := range resp.Executions { + executions = append(executions, wf.Execution) + } + } else { + hasMorePages = false + } + + // Send initial tasks + for _, wf := range executions { + if activeWorkers >= config.concurrency { + break + } + taskCh <- taskDetail{ + execution: wf, + attempts: 1, + hbd: hbd, + pageNumber: currentPageNumber, + } + activeWorkers++ + totalTasksSent++ + pendingTasksPerPage[currentPageNumber]++ + } + + // Track remaining executions from current page + executionIndex := activeWorkers + + // Main processing loop with proactive page fetching + for totalTasksCompleted < totalTasksSent || hasMorePages { + select { + case taskResult := <-respCh: + activeWorkers-- + totalTasksCompleted++ + + if taskResult.err == nil { + hbd.SuccessCount++ + } else { + hbd.ErrorCount++ + } + + // Track page completion + pageNum := taskResult.pageNumber + pendingTasksPerPage[pageNum]-- + if pendingTasksPerPage[pageNum] == 0 { + // Page is fully completed, record heartbeat + delete(pendingTasksPerPage, pageNum) + activity.RecordHeartbeat(ctx, hbd) + } + + // Try to send more tasks from current page or fetch next page + tasksSentInIteration := 0 + + // First, send remaining tasks from current page + hasCapacity := activeWorkers < config.concurrency + for executionIndex < len(executions) && hasCapacity { + taskCh <- taskDetail{ + execution: executions[executionIndex], + attempts: 1, + hbd: hbd, + pageNumber: currentPageNumber, + } + activeWorkers++ + totalTasksSent++ + executionIndex++ + tasksSentInIteration++ + pendingTasksPerPage[currentPageNumber]++ + } + + // If current page is exhausted and we have capacity, fetch next page + pageExhausted := executionIndex >= len(executions) + if pageExhausted && hasMorePages && hasCapacity { + resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + PageSize: int32(pageSize), + NextPageToken: currentPageToken, + Query: config.adjustedQuery, + }) + if err != nil { + metrics.BatcherOperationFailures.With(metricsHandler).Record(1) + logger.Error("Failed to list workflow executions", tag.Error(err)) + return HeartBeatDetails{}, err + } + + // Update page state + hbd.CurrentPage++ + hbd.PageToken = resp.NextPageToken + currentPageToken = resp.NextPageToken + hasMorePages = len(currentPageToken) > 0 + currentPageNumber = hbd.CurrentPage + + // Reset executions for new page + executions = nil + for _, wf := range resp.Executions { + executions = append(executions, wf.Execution) + } + executionIndex = 0 + + // Send tasks from new page + for executionIndex < len(executions) && activeWorkers < config.concurrency { + taskCh <- taskDetail{ + execution: executions[executionIndex], + attempts: 1, + hbd: hbd, + pageNumber: currentPageNumber, + } + activeWorkers++ + totalTasksSent++ + executionIndex++ + tasksSentInIteration++ + pendingTasksPerPage[currentPageNumber]++ + } + } + + case <-ctx.Done(): + metrics.BatcherOperationFailures.With(metricsHandler).Record(1) + logger.Error("Failed to complete batch operation", tag.Error(ctx.Err())) + return HeartBeatDetails{}, ctx.Err() + } + } + + return hbd, nil +} + type activities struct { activityDeps namespace namespace.Name @@ -129,83 +320,103 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams) } hbd.TotalEstimate = estimateCount } - rps := a.getOperationRPS(batchParams.RPS) - rateLimit := rate.Limit(rps) - burstLimit := int(math.Ceil(rps)) // should never be zero because everything would be rejected - rateLimiter := rate.NewLimiter(rateLimit, burstLimit) - taskCh := make(chan taskDetail, pageSize) - respCh := make(chan error, pageSize) - for i := 0; i < a.getOperationConcurrency(batchParams.Concurrency); i++ { - go startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) + + // Prepare configuration for shared processing function + config := batchProcessorConfig{ + namespace: batchParams.Namespace, + adjustedQuery: adjustedQuery, + rps: a.getOperationRPS(batchParams.RPS), + concurrency: a.getOperationConcurrency(batchParams.Concurrency), + initialPageToken: hbd.PageToken, + initialExecutions: batchParams.Executions, } - for { - executions := batchParams.Executions - pageToken := hbd.PageToken + // Create a wrapper for the batch params specific worker processor + workerProcessor := func( + ctx context.Context, + taskCh chan taskDetail, + respCh chan taskResponse, + rateLimiter *rate.Limiter, + sdkClient sdkclient.Client, + frontendClient workflowservice.WorkflowServiceClient, + metricsHandler metrics.Handler, + logger log.Logger, + ) { + startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) + } + + return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) +} + +// BatchActivityWithProtobuf is an activity for processing batch operations using protobuf as the input type. +// nolint:revive,cognitive-complexity +func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams *batchspb.BatchOperationInput) (HeartBeatDetails, error) { + logger := a.getActivityLogger(ctx) + hbd := HeartBeatDetails{} + metricsHandler := a.MetricsHandler.WithTags(metrics.OperationTag(metrics.BatcherScope), metrics.NamespaceIDTag(batchParams.NamespaceId)) + + if err := a.checkNamespaceID(batchParams.NamespaceId); err != nil { + metrics.BatcherOperationFailures.With(metricsHandler).Record(1) + logger.Error("Failed to run batch operation due to namespace mismatch", tag.Error(err)) + return hbd, err + } + + sdkClient := a.ClientFactory.NewClient(sdkclient.Options{ + Namespace: a.namespace.String(), + DataConverter: sdk.PreferProtoDataConverter, + }) + startOver := true + if activity.HasHeartbeatDetails(ctx) { + if err := activity.GetHeartbeatDetails(ctx, &hbd); err == nil { + startOver = false + } else { + logger.Error("Failed to recover from last heartbeat, start over from beginning", tag.Error(err)) + } + } + + adjustedQuery := a.adjustQueryBatchTypeEnum(batchParams.Request.VisibilityQuery, batchParams.BatchType) + + if startOver { + estimateCount := int64(len(batchParams.Request.Executions)) if len(adjustedQuery) > 0 { - resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ - PageSize: int32(pageSize), - NextPageToken: pageToken, - Query: adjustedQuery, + resp, err := sdkClient.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{ + Query: adjustedQuery, }) if err != nil { metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to list workflow executions", tag.Error(err)) + logger.Error("Failed to get estimate workflow count", tag.Error(err)) return HeartBeatDetails{}, err } - pageToken = resp.NextPageToken - for _, wf := range resp.Executions { - executions = append(executions, wf.Execution) - } - } - - batchCount := len(executions) - if batchCount <= 0 { - break - } - // send all tasks - for _, wf := range executions { - taskCh <- taskDetail{ - execution: wf, - attempts: 1, - hbd: hbd, - } - } - - succCount := 0 - errCount := 0 - // wait for counters indicate this batch is done - Loop: - for { - select { - case err := <-respCh: - if err == nil { - succCount++ - } else { - errCount++ - } - if succCount+errCount == batchCount { - break Loop - } - case <-ctx.Done(): - metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to complete batch operation", tag.Error(ctx.Err())) - return HeartBeatDetails{}, ctx.Err() - } + estimateCount = resp.GetCount() } + hbd.TotalEstimate = estimateCount + } - hbd.CurrentPage++ - hbd.PageToken = pageToken - hbd.SuccessCount += succCount - hbd.ErrorCount += errCount - activity.RecordHeartbeat(ctx, hbd) + // Prepare configuration for shared processing function + config := batchProcessorConfig{ + namespace: batchParams.Request.Namespace, + adjustedQuery: adjustedQuery, + rps: a.getOperationRPS(batchParams.Rps), + concurrency: a.getOperationConcurrency(int(batchParams.Concurrency)), + initialPageToken: hbd.PageToken, + initialExecutions: batchParams.Request.Executions, + } - if len(hbd.PageToken) == 0 { - break - } + // Create a wrapper for the protobuf specific worker processor + workerProcessor := func( + ctx context.Context, + taskCh chan taskDetail, + respCh chan taskResponse, + rateLimiter *rate.Limiter, + sdkClient sdkclient.Client, + frontendClient workflowservice.WorkflowServiceClient, + metricsHandler metrics.Handler, + logger log.Logger, + ) { + startTaskProcessorProtobuf(ctx, batchParams, batchParams.Request.Namespace, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger) } - return hbd, nil + return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) } // BatchActivityWithProtobuf is an activity for processing batch operations using protobuf as the input type. @@ -387,7 +598,7 @@ func startTaskProcessor( ctx context.Context, batchParams BatchParams, taskCh chan taskDetail, - respCh chan error, + respCh chan taskResponse, limiter *rate.Limiter, sdkClient sdkclient.Client, frontendClient workflowservice.WorkflowServiceClient, @@ -597,7 +808,7 @@ func startTaskProcessor( _, ok := batchParams._nonRetryableErrors[err.Error()] if ok || task.attempts > batchParams.AttemptsOnRetryableError { - respCh <- err + respCh <- taskResponse{err: err, pageNumber: task.pageNumber} } else { // put back to the channel if less than attemptsOnError task.attempts++ @@ -605,7 +816,228 @@ func startTaskProcessor( } } else { metrics.BatcherProcessorSuccess.With(metricsHandler).Record(1) - respCh <- nil + respCh <- taskResponse{err: nil, pageNumber: task.pageNumber} + } + } + } +} + +// nolint:revive,cognitive-complexity +func startTaskProcessorProtobuf( + ctx context.Context, + batchOperation *batchspb.BatchOperationInput, + namespace string, + taskCh chan taskDetail, + respCh chan taskResponse, + limiter *rate.Limiter, + sdkClient sdkclient.Client, + frontendClient workflowservice.WorkflowServiceClient, + metricsHandler metrics.Handler, + logger log.Logger, +) { + for { + select { + case <-ctx.Done(): + return + case task := <-taskCh: + if isDone(ctx) { + return + } + var err error + + switch operation := batchOperation.Request.Operation.(type) { + case *workflowservice.StartBatchOperationRequest_TerminationOperation: + err = processTask(ctx, limiter, task, + func(workflowID, runID string) error { + return sdkClient.TerminateWorkflow(ctx, workflowID, runID, batchOperation.Request.Reason) + }) + case *workflowservice.StartBatchOperationRequest_CancellationOperation: + err = processTask(ctx, limiter, task, + func(workflowID, runID string) error { + return sdkClient.CancelWorkflow(ctx, workflowID, runID) + }) + case *workflowservice.StartBatchOperationRequest_SignalOperation: + err = processTask(ctx, limiter, task, + func(workflowID, runID string) error { + _, err := frontendClient.SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + SignalName: operation.SignalOperation.GetSignal(), + Input: operation.SignalOperation.GetInput(), + Identity: operation.SignalOperation.GetIdentity(), + }) + return err + }) + case *workflowservice.StartBatchOperationRequest_DeletionOperation: + err = processTask(ctx, limiter, task, + func(workflowID, runID string) error { + _, err := frontendClient.DeleteWorkflowExecution(ctx, &workflowservice.DeleteWorkflowExecutionRequest{ + Namespace: namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + }) + return err + }) + case *workflowservice.StartBatchOperationRequest_ResetOperation: + err = processTask(ctx, limiter, task, + func(workflowID, runID string) error { + workflowExecution := &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + } + var eventId int64 + var err error + //nolint:staticcheck // SA1019: worker versioning v0.31 + var resetReapplyType enumspb.ResetReapplyType + var resetReapplyExcludeTypes []enumspb.ResetReapplyExcludeType + if operation.ResetOperation.Options != nil { + // Using ResetOptions + // Note: getResetEventIDByOptions may modify workflowExecution.RunId, if reset should be to a prior run + //nolint:staticcheck // SA1019: worker versioning v0.31 + eventId, err = getResetEventIDByOptions(ctx, operation.ResetOperation.Options, namespace, workflowExecution, frontendClient, logger) + //nolint:staticcheck // SA1019: worker versioning v0.31 + resetReapplyType = operation.ResetOperation.Options.ResetReapplyType + //nolint:staticcheck // SA1019: worker versioning v0.31 + resetReapplyExcludeTypes = operation.ResetOperation.Options.ResetReapplyExcludeTypes + } else { + // Old fields + //nolint:staticcheck // SA1019: worker versioning v0.31 + eventId, err = getResetEventIDByType(ctx, operation.ResetOperation.ResetType, batchOperation.Request.Namespace, workflowExecution, frontendClient, logger) + //nolint:staticcheck // SA1019: worker versioning v0.31 + resetReapplyType = operation.ResetOperation.ResetReapplyType + } + if err != nil { + return err + } + _, err = frontendClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ + Namespace: namespace, + WorkflowExecution: workflowExecution, + Reason: batchOperation.Request.Reason, + RequestId: uuid.New(), + WorkflowTaskFinishEventId: eventId, + ResetReapplyType: resetReapplyType, + ResetReapplyExcludeTypes: resetReapplyExcludeTypes, + PostResetOperations: operation.ResetOperation.PostResetOperations, + Identity: operation.ResetOperation.Identity, + }) + return err + }) + case *workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation: + err = processTask(ctx, limiter, task, + func(workflowID, runID string) error { + unpauseRequest := &workflowservice.UnpauseActivityRequest{ + Namespace: namespace, + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Identity: operation.UnpauseActivitiesOperation.Identity, + ResetAttempts: !operation.UnpauseActivitiesOperation.ResetAttempts, + ResetHeartbeat: operation.UnpauseActivitiesOperation.ResetHeartbeat, + Jitter: operation.UnpauseActivitiesOperation.Jitter, + } + + switch ao := operation.UnpauseActivitiesOperation.GetActivity().(type) { + case *batchpb.BatchOperationUnpauseActivities_Type: + unpauseRequest.Activity = &workflowservice.UnpauseActivityRequest_Type{ + Type: ao.Type, + } + case *batchpb.BatchOperationUnpauseActivities_MatchAll: + unpauseRequest.Activity = &workflowservice.UnpauseActivityRequest_UnpauseAll{UnpauseAll: true} + } + + _, err = frontendClient.UnpauseActivity(ctx, unpauseRequest) + return err + }) + + case *workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation: + err = processTask(ctx, limiter, task, + func(workflowID, runID string) error { + var err error + _, err = frontendClient.UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + WorkflowExecutionOptions: operation.UpdateWorkflowOptionsOperation.WorkflowExecutionOptions, + UpdateMask: &fieldmaskpb.FieldMask{Paths: operation.UpdateWorkflowOptionsOperation.UpdateMask.Paths}, + }) + return err + }) + case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation: + err = processTask(ctx, limiter, task, + func(workflowID, runID string) error { + resetRequest := &workflowservice.ResetActivityRequest{ + Namespace: namespace, + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Identity: operation.ResetActivitiesOperation.Identity, + ResetHeartbeat: operation.ResetActivitiesOperation.ResetHeartbeat, + Jitter: operation.ResetActivitiesOperation.Jitter, + KeepPaused: operation.ResetActivitiesOperation.KeepPaused, + RestoreOriginalOptions: operation.ResetActivitiesOperation.RestoreOriginalOptions, + } + + switch ao := operation.ResetActivitiesOperation.GetActivity().(type) { + case *batchpb.BatchOperationResetActivities_Type: + resetRequest.Activity = &workflowservice.ResetActivityRequest_Type{Type: ao.Type} + case *batchpb.BatchOperationResetActivities_MatchAll: + resetRequest.Activity = &workflowservice.ResetActivityRequest_MatchAll{MatchAll: true} + } + + _, err = frontendClient.ResetActivity(ctx, resetRequest) + return err + }) + case *workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation: + err = processTask(ctx, limiter, task, + func(workflowID, runID string) error { + updateRequest := &workflowservice.UpdateActivityOptionsRequest{ + Namespace: namespace, + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + UpdateMask: &fieldmaskpb.FieldMask{Paths: operation.UpdateActivityOptionsOperation.UpdateMask.Paths}, + RestoreOriginal: operation.UpdateActivityOptionsOperation.RestoreOriginal, + Identity: operation.UpdateActivityOptionsOperation.Identity, + } + + switch ao := operation.UpdateActivityOptionsOperation.GetActivity().(type) { + case *batchpb.BatchOperationUpdateActivityOptions_Type: + updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_Type{Type: ao.Type} + case *batchpb.BatchOperationUpdateActivityOptions_MatchAll: + updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_MatchAll{MatchAll: true} + } + + updateRequest.ActivityOptions = operation.UpdateActivityOptionsOperation.GetActivityOptions() + _, err = frontendClient.UpdateActivityOptions(ctx, updateRequest) + return err + }) + default: + err = errors.New(fmt.Sprintf("unknown batch type: %v", batchOperation.BatchType)) + } + if err != nil { + metrics.BatcherProcessorFailures.With(metricsHandler).Record(1) + logger.Error("Failed to process batch operation task", tag.Error(err)) + nonRetryable := slices.Contains(batchOperation.NonRetryableErrors, err.Error()) + if nonRetryable || task.attempts > int(batchOperation.AttemptsOnRetryableError) { + respCh <- taskResponse{err: err, pageNumber: task.pageNumber} + } else { + // put back to the channel if less than attemptsOnError + task.attempts++ + taskCh <- task + } + } else { + metrics.BatcherProcessorSuccess.With(metricsHandler).Record(1) + respCh <- taskResponse{err: nil, pageNumber: task.pageNumber} } } } diff --git a/service/worker/batcher/workflow.go b/service/worker/batcher/workflow.go index 942304edc29..5051321bc3b 100644 --- a/service/worker/batcher/workflow.go +++ b/service/worker/batcher/workflow.go @@ -222,10 +222,21 @@ type ( } taskDetail struct { + // the workflow execution to process execution *commonpb.WorkflowExecution - attempts int + // the number of attempts to process the workflow execution + attempts int // passing along the current heartbeat details to make heartbeat within a task so that it won't timeout hbd HeartBeatDetails + // the page number this task belongs to (for tracking page completion) + pageNumber int + } + + taskResponse struct { + // the error result from processing the task (nil for success) + err error + // the page number the completed task belonged to + pageNumber int } ) From f34e239f0301b7ae1d8b98ba567a5efa71fae941 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Mon, 4 Aug 2025 14:54:54 -0600 Subject: [PATCH 02/13] fixing syntax errors --- service/worker/batcher/activities.go | 302 +-------------------------- 1 file changed, 5 insertions(+), 297 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index 56ac82850e5..1db4f53fd68 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -348,77 +348,6 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams) return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) } -// BatchActivityWithProtobuf is an activity for processing batch operations using protobuf as the input type. -// nolint:revive,cognitive-complexity -func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams *batchspb.BatchOperationInput) (HeartBeatDetails, error) { - logger := a.getActivityLogger(ctx) - hbd := HeartBeatDetails{} - metricsHandler := a.MetricsHandler.WithTags(metrics.OperationTag(metrics.BatcherScope), metrics.NamespaceIDTag(batchParams.NamespaceId)) - - if err := a.checkNamespaceID(batchParams.NamespaceId); err != nil { - metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to run batch operation due to namespace mismatch", tag.Error(err)) - return hbd, err - } - - sdkClient := a.ClientFactory.NewClient(sdkclient.Options{ - Namespace: a.namespace.String(), - DataConverter: sdk.PreferProtoDataConverter, - }) - startOver := true - if activity.HasHeartbeatDetails(ctx) { - if err := activity.GetHeartbeatDetails(ctx, &hbd); err == nil { - startOver = false - } else { - logger.Error("Failed to recover from last heartbeat, start over from beginning", tag.Error(err)) - } - } - - adjustedQuery := a.adjustQueryBatchTypeEnum(batchParams.Request.VisibilityQuery, batchParams.BatchType) - - if startOver { - estimateCount := int64(len(batchParams.Request.Executions)) - if len(adjustedQuery) > 0 { - resp, err := sdkClient.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{ - Query: adjustedQuery, - }) - if err != nil { - metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to get estimate workflow count", tag.Error(err)) - return HeartBeatDetails{}, err - } - estimateCount = resp.GetCount() - } - hbd.TotalEstimate = estimateCount - } - - // Prepare configuration for shared processing function - config := batchProcessorConfig{ - namespace: batchParams.Request.Namespace, - adjustedQuery: adjustedQuery, - rps: a.getOperationRPS(batchParams.Rps), - concurrency: a.getOperationConcurrency(int(batchParams.Concurrency)), - initialPageToken: hbd.PageToken, - initialExecutions: batchParams.Request.Executions, - } - - // Create a wrapper for the protobuf specific worker processor - workerProcessor := func( - ctx context.Context, - taskCh chan taskDetail, - respCh chan taskResponse, - rateLimiter *rate.Limiter, - sdkClient sdkclient.Client, - frontendClient workflowservice.WorkflowServiceClient, - metricsHandler metrics.Handler, - logger log.Logger, - ) { - startTaskProcessorProtobuf(ctx, batchParams, batchParams.Request.Namespace, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger) - } - - return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) -} - // BatchActivityWithProtobuf is an activity for processing batch operations using protobuf as the input type. // nolint:revive,cognitive-complexity func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams *batchspb.BatchOperationInput) (HeartBeatDetails, error) { @@ -467,7 +396,7 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams burstLimit := rps // should never be zero because everything would be rejected rateLimiter := rate.NewLimiter(rateLimit, burstLimit) taskCh := make(chan taskDetail, pageSize) - respCh := make(chan error, pageSize) + respCh := make(chan taskResponse, pageSize) for i := 0; i < a.getOperationConcurrency(int(batchParams.Concurrency)); i++ { go startTaskProcessorProtobuf(ctx, batchParams, a.namespace.String(), taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) } @@ -511,8 +440,8 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams Loop: for { select { - case err := <-respCh: - if err == nil { + case task := <-respCh: + if task.err == nil { succCount++ } else { errCount++ @@ -845,227 +774,6 @@ func startTaskProcessorProtobuf( } var err error - switch operation := batchOperation.Request.Operation.(type) { - case *workflowservice.StartBatchOperationRequest_TerminationOperation: - err = processTask(ctx, limiter, task, - func(workflowID, runID string) error { - return sdkClient.TerminateWorkflow(ctx, workflowID, runID, batchOperation.Request.Reason) - }) - case *workflowservice.StartBatchOperationRequest_CancellationOperation: - err = processTask(ctx, limiter, task, - func(workflowID, runID string) error { - return sdkClient.CancelWorkflow(ctx, workflowID, runID) - }) - case *workflowservice.StartBatchOperationRequest_SignalOperation: - err = processTask(ctx, limiter, task, - func(workflowID, runID string) error { - _, err := frontendClient.SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{ - Namespace: namespace, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - SignalName: operation.SignalOperation.GetSignal(), - Input: operation.SignalOperation.GetInput(), - Identity: operation.SignalOperation.GetIdentity(), - }) - return err - }) - case *workflowservice.StartBatchOperationRequest_DeletionOperation: - err = processTask(ctx, limiter, task, - func(workflowID, runID string) error { - _, err := frontendClient.DeleteWorkflowExecution(ctx, &workflowservice.DeleteWorkflowExecutionRequest{ - Namespace: namespace, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - }) - return err - }) - case *workflowservice.StartBatchOperationRequest_ResetOperation: - err = processTask(ctx, limiter, task, - func(workflowID, runID string) error { - workflowExecution := &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - } - var eventId int64 - var err error - //nolint:staticcheck // SA1019: worker versioning v0.31 - var resetReapplyType enumspb.ResetReapplyType - var resetReapplyExcludeTypes []enumspb.ResetReapplyExcludeType - if operation.ResetOperation.Options != nil { - // Using ResetOptions - // Note: getResetEventIDByOptions may modify workflowExecution.RunId, if reset should be to a prior run - //nolint:staticcheck // SA1019: worker versioning v0.31 - eventId, err = getResetEventIDByOptions(ctx, operation.ResetOperation.Options, namespace, workflowExecution, frontendClient, logger) - //nolint:staticcheck // SA1019: worker versioning v0.31 - resetReapplyType = operation.ResetOperation.Options.ResetReapplyType - //nolint:staticcheck // SA1019: worker versioning v0.31 - resetReapplyExcludeTypes = operation.ResetOperation.Options.ResetReapplyExcludeTypes - } else { - // Old fields - //nolint:staticcheck // SA1019: worker versioning v0.31 - eventId, err = getResetEventIDByType(ctx, operation.ResetOperation.ResetType, batchOperation.Request.Namespace, workflowExecution, frontendClient, logger) - //nolint:staticcheck // SA1019: worker versioning v0.31 - resetReapplyType = operation.ResetOperation.ResetReapplyType - } - if err != nil { - return err - } - _, err = frontendClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ - Namespace: namespace, - WorkflowExecution: workflowExecution, - Reason: batchOperation.Request.Reason, - RequestId: uuid.New(), - WorkflowTaskFinishEventId: eventId, - ResetReapplyType: resetReapplyType, - ResetReapplyExcludeTypes: resetReapplyExcludeTypes, - PostResetOperations: operation.ResetOperation.PostResetOperations, - Identity: operation.ResetOperation.Identity, - }) - return err - }) - case *workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation: - err = processTask(ctx, limiter, task, - func(workflowID, runID string) error { - unpauseRequest := &workflowservice.UnpauseActivityRequest{ - Namespace: namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Identity: operation.UnpauseActivitiesOperation.Identity, - ResetAttempts: !operation.UnpauseActivitiesOperation.ResetAttempts, - ResetHeartbeat: operation.UnpauseActivitiesOperation.ResetHeartbeat, - Jitter: operation.UnpauseActivitiesOperation.Jitter, - } - - switch ao := operation.UnpauseActivitiesOperation.GetActivity().(type) { - case *batchpb.BatchOperationUnpauseActivities_Type: - unpauseRequest.Activity = &workflowservice.UnpauseActivityRequest_Type{ - Type: ao.Type, - } - case *batchpb.BatchOperationUnpauseActivities_MatchAll: - unpauseRequest.Activity = &workflowservice.UnpauseActivityRequest_UnpauseAll{UnpauseAll: true} - } - - _, err = frontendClient.UnpauseActivity(ctx, unpauseRequest) - return err - }) - - case *workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation: - err = processTask(ctx, limiter, task, - func(workflowID, runID string) error { - var err error - _, err = frontendClient.UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ - Namespace: namespace, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - WorkflowExecutionOptions: operation.UpdateWorkflowOptionsOperation.WorkflowExecutionOptions, - UpdateMask: &fieldmaskpb.FieldMask{Paths: operation.UpdateWorkflowOptionsOperation.UpdateMask.Paths}, - }) - return err - }) - case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation: - err = processTask(ctx, limiter, task, - func(workflowID, runID string) error { - resetRequest := &workflowservice.ResetActivityRequest{ - Namespace: namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Identity: operation.ResetActivitiesOperation.Identity, - ResetHeartbeat: operation.ResetActivitiesOperation.ResetHeartbeat, - Jitter: operation.ResetActivitiesOperation.Jitter, - KeepPaused: operation.ResetActivitiesOperation.KeepPaused, - RestoreOriginalOptions: operation.ResetActivitiesOperation.RestoreOriginalOptions, - } - - switch ao := operation.ResetActivitiesOperation.GetActivity().(type) { - case *batchpb.BatchOperationResetActivities_Type: - resetRequest.Activity = &workflowservice.ResetActivityRequest_Type{Type: ao.Type} - case *batchpb.BatchOperationResetActivities_MatchAll: - resetRequest.Activity = &workflowservice.ResetActivityRequest_MatchAll{MatchAll: true} - } - - _, err = frontendClient.ResetActivity(ctx, resetRequest) - return err - }) - case *workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation: - err = processTask(ctx, limiter, task, - func(workflowID, runID string) error { - updateRequest := &workflowservice.UpdateActivityOptionsRequest{ - Namespace: namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - UpdateMask: &fieldmaskpb.FieldMask{Paths: operation.UpdateActivityOptionsOperation.UpdateMask.Paths}, - RestoreOriginal: operation.UpdateActivityOptionsOperation.RestoreOriginal, - Identity: operation.UpdateActivityOptionsOperation.Identity, - } - - switch ao := operation.UpdateActivityOptionsOperation.GetActivity().(type) { - case *batchpb.BatchOperationUpdateActivityOptions_Type: - updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_Type{Type: ao.Type} - case *batchpb.BatchOperationUpdateActivityOptions_MatchAll: - updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_MatchAll{MatchAll: true} - } - - updateRequest.ActivityOptions = operation.UpdateActivityOptionsOperation.GetActivityOptions() - _, err = frontendClient.UpdateActivityOptions(ctx, updateRequest) - return err - }) - default: - err = errors.New(fmt.Sprintf("unknown batch type: %v", batchOperation.BatchType)) - } - if err != nil { - metrics.BatcherProcessorFailures.With(metricsHandler).Record(1) - logger.Error("Failed to process batch operation task", tag.Error(err)) - nonRetryable := slices.Contains(batchOperation.NonRetryableErrors, err.Error()) - if nonRetryable || task.attempts > int(batchOperation.AttemptsOnRetryableError) { - respCh <- taskResponse{err: err, pageNumber: task.pageNumber} - } else { - // put back to the channel if less than attemptsOnError - task.attempts++ - taskCh <- task - } - } else { - metrics.BatcherProcessorSuccess.With(metricsHandler).Record(1) - respCh <- taskResponse{err: nil, pageNumber: task.pageNumber} - } - } - } -} - -// nolint:revive,cognitive-complexity -func startTaskProcessorProtobuf( - ctx context.Context, - batchOperation *batchspb.BatchOperationInput, - namespace string, - taskCh chan taskDetail, - respCh chan error, - limiter *rate.Limiter, - sdkClient sdkclient.Client, - frontendClient workflowservice.WorkflowServiceClient, - metricsHandler metrics.Handler, - logger log.Logger, -) { - for { - select { - case <-ctx.Done(): - return - case task := <-taskCh: - if isDone(ctx) { - return - } - var err error - switch operation := batchOperation.Request.Operation.(type) { case *workflowservice.StartBatchOperationRequest_TerminationOperation: err = processTask(ctx, limiter, task, @@ -1256,7 +964,7 @@ func startTaskProcessorProtobuf( logger.Error("Failed to process batch operation task", tag.Error(err)) nonRetryable := slices.Contains(batchOperation.NonRetryableErrors, err.Error()) if nonRetryable || task.attempts > int(batchOperation.AttemptsOnRetryableError) { - respCh <- err + respCh <- taskResponse{err: err, pageNumber: task.pageNumber} } else { // put back to the channel if less than attemptsOnError task.attempts++ @@ -1264,7 +972,7 @@ func startTaskProcessorProtobuf( } } else { metrics.BatcherProcessorSuccess.With(metricsHandler).Record(1) - respCh <- nil + respCh <- taskResponse{err: nil, pageNumber: task.pageNumber} } } } From 0e75fedede9fc1a7183451f0d2f7233c1934fd89 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Mon, 4 Aug 2025 15:33:23 -0600 Subject: [PATCH 03/13] adding correct paging to the protobuf impl --- service/worker/batcher/activities.go | 100 +++++++-------------------- 1 file changed, 24 insertions(+), 76 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index 1db4f53fd68..b168e76a6d7 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -163,8 +163,7 @@ func (a *activities) processWorkflowsWithProactiveFetching( tasksSentInIteration := 0 // First, send remaining tasks from current page - hasCapacity := activeWorkers < config.concurrency - for executionIndex < len(executions) && hasCapacity { + for executionIndex < len(executions) && activeWorkers < config.concurrency { taskCh <- taskDetail{ execution: executions[executionIndex], attempts: 1, @@ -180,7 +179,7 @@ func (a *activities) processWorkflowsWithProactiveFetching( // If current page is exhausted and we have capacity, fetch next page pageExhausted := executionIndex >= len(executions) - if pageExhausted && hasMorePages && hasCapacity { + if pageExhausted && hasMorePages && activeWorkers < config.concurrency { resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ PageSize: int32(pageSize), NextPageToken: currentPageToken, @@ -391,83 +390,32 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams } hbd.TotalEstimate = estimateCount } - rps := a.rps(a.namespace.String()) - rateLimit := rate.Limit(rps) - burstLimit := rps // should never be zero because everything would be rejected - rateLimiter := rate.NewLimiter(rateLimit, burstLimit) - taskCh := make(chan taskDetail, pageSize) - respCh := make(chan taskResponse, pageSize) - for i := 0; i < a.getOperationConcurrency(int(batchParams.Concurrency)); i++ { - go startTaskProcessorProtobuf(ctx, batchParams, a.namespace.String(), taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) - } - for { - executions := batchParams.Request.Executions - pageToken := hbd.PageToken - if len(adjustedQuery) > 0 { - resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ - PageSize: int32(pageSize), - NextPageToken: pageToken, - Query: adjustedQuery, - }) - if err != nil { - metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to list workflow executions", tag.Error(err)) - return HeartBeatDetails{}, err - } - pageToken = resp.NextPageToken - for _, wf := range resp.Executions { - executions = append(executions, wf.Execution) - } - } - - batchCount := len(executions) - if batchCount <= 0 { - break - } - // send all tasks - for _, wf := range executions { - taskCh <- taskDetail{ - execution: wf, - attempts: 1, - hbd: hbd, - } - } - - succCount := 0 - errCount := 0 - // wait for counters indicate this batch is done - Loop: - for { - select { - case task := <-respCh: - if task.err == nil { - succCount++ - } else { - errCount++ - } - if succCount+errCount == batchCount { - break Loop - } - case <-ctx.Done(): - metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to complete batch operation", tag.Error(ctx.Err())) - return HeartBeatDetails{}, ctx.Err() - } - } - - hbd.CurrentPage++ - hbd.PageToken = pageToken - hbd.SuccessCount += succCount - hbd.ErrorCount += errCount - activity.RecordHeartbeat(ctx, hbd) + // Prepare configuration for shared processing function + config := batchProcessorConfig{ + namespace: batchParams.Request.Namespace, + adjustedQuery: adjustedQuery, + rps: float64(a.rps(batchParams.Request.Namespace)), + concurrency: a.getOperationConcurrency(int(batchParams.Concurrency)), + initialPageToken: hbd.PageToken, + initialExecutions: batchParams.Request.Executions, + } - if len(hbd.PageToken) == 0 { - break - } + // Create a wrapper for the protobuf specific worker processor + workerProcessor := func( + ctx context.Context, + taskCh chan taskDetail, + respCh chan taskResponse, + rateLimiter *rate.Limiter, + sdkClient sdkclient.Client, + frontendClient workflowservice.WorkflowServiceClient, + metricsHandler metrics.Handler, + logger log.Logger, + ) { + startTaskProcessorProtobuf(ctx, batchParams, batchParams.Request.Namespace, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger) } - return hbd, nil + return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) } func (a *activities) getActivityLogger(ctx context.Context) log.Logger { From dabab71e38c1afbffa1996cc35e4a67fdb9de5fa Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Mon, 4 Aug 2025 16:12:56 -0600 Subject: [PATCH 04/13] davids comments on old pr --- service/frontend/workflow_handler.go | 4 +- service/worker/batcher/activities.go | 95 +++++++++------------------- 2 files changed, 33 insertions(+), 66 deletions(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 95d9aeb732d..3d82cd1a25a 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4815,9 +4815,11 @@ func (wh *WorkflowHandler) ListBatchOperations( Namespace: request.GetNamespace(), PageSize: request.PageSize, NextPageToken: request.GetNextPageToken(), - Query: fmt.Sprintf("%s = '%s' and %s = '%s'", + Query: fmt.Sprintf("(%s = '%s' or %s = '%s') and %s = '%s'", searchattribute.WorkflowType, batcher.BatchWFTypeProtobufName, + searchattribute.WorkflowType, + batcher.BatchWFTypeName, searchattribute.TemporalNamespaceDivision, batcher.NamespaceDivision, ), diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index b168e76a6d7..ac7146b5d56 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -81,7 +81,7 @@ func (a *activities) processWorkflowsWithProactiveFetching( respCh := make(chan taskResponse, pageSize) // Start worker processors - for i := 0; i < config.concurrency; i++ { + for range config.concurrency { go startWorkerProcessor(ctx, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) } @@ -507,13 +507,10 @@ func startTaskProcessor( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { _, err := frontendClient.SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{ - Namespace: batchParams.Namespace, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, - SignalName: batchParams.SignalParams.SignalName, - Input: batchParams.SignalParams.Input, + Namespace: batchParams.Namespace, + WorkflowExecution: execution, + SignalName: batchParams.SignalParams.SignalName, + Input: batchParams.SignalParams.Input, }) return err }) @@ -521,21 +518,14 @@ func startTaskProcessor( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { _, err := frontendClient.DeleteWorkflowExecution(ctx, &workflowservice.DeleteWorkflowExecutionRequest{ - Namespace: batchParams.Namespace, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: batchParams.Namespace, + WorkflowExecution: execution, }) return err }) case BatchTypeReset: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { - workflowExecution := &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - } var eventId int64 var err error var resetReapplyType enumspb.ResetReapplyType @@ -543,12 +533,12 @@ func startTaskProcessor( if batchParams.ResetParams.resetOptions != nil { // Using ResetOptions // Note: getResetEventIDByOptions may modify workflowExecution.RunId, if reset should be to a prior run - eventId, err = getResetEventIDByOptions(ctx, batchParams.ResetParams.resetOptions, batchParams.Namespace, workflowExecution, frontendClient, logger) + eventId, err = getResetEventIDByOptions(ctx, batchParams.ResetParams.resetOptions, batchParams.Namespace, execution, frontendClient, logger) resetReapplyType = batchParams.ResetParams.resetOptions.ResetReapplyType resetReapplyExcludeTypes = batchParams.ResetParams.resetOptions.ResetReapplyExcludeTypes } else { // Old fields - eventId, err = getResetEventIDByType(ctx, batchParams.ResetParams.ResetType, batchParams.Namespace, workflowExecution, frontendClient, logger) + eventId, err = getResetEventIDByType(ctx, batchParams.ResetParams.ResetType, batchParams.Namespace, execution, frontendClient, logger) resetReapplyType = batchParams.ResetParams.ResetReapplyType } if err != nil { @@ -556,7 +546,7 @@ func startTaskProcessor( } _, err = frontendClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ Namespace: batchParams.Namespace, - WorkflowExecution: workflowExecution, + WorkflowExecution: execution, Reason: batchParams.Reason, RequestId: uuid.New(), WorkflowTaskFinishEventId: eventId, @@ -570,11 +560,8 @@ func startTaskProcessor( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { unpauseRequest := &workflowservice.UnpauseActivityRequest{ - Namespace: batchParams.Namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: batchParams.Namespace, + Execution: execution, Identity: batchParams.UnpauseActivitiesParams.Identity, Activity: &workflowservice.UnpauseActivityRequest_Type{Type: batchParams.UnpauseActivitiesParams.ActivityType}, ResetAttempts: !batchParams.UnpauseActivitiesParams.ResetAttempts, @@ -596,11 +583,8 @@ func startTaskProcessor( func(execution *commonpb.WorkflowExecution) error { var err error _, err = frontendClient.UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ - Namespace: batchParams.Namespace, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: batchParams.Namespace, + WorkflowExecution: execution, WorkflowExecutionOptions: batchParams.UpdateOptionsParams.WorkflowExecutionOptions, UpdateMask: &fieldmaskpb.FieldMask{Paths: batchParams.UpdateOptionsParams.UpdateMask.Paths}, }) @@ -611,11 +595,8 @@ func startTaskProcessor( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { resetRequest := &workflowservice.ResetActivityRequest{ - Namespace: batchParams.Namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: batchParams.Namespace, + Execution: execution, Identity: batchParams.ResetActivitiesParams.Identity, Activity: &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ActivityType}, ResetHeartbeat: batchParams.ResetActivitiesParams.ResetHeartbeat, @@ -637,11 +618,8 @@ func startTaskProcessor( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { updateRequest := &workflowservice.UpdateActivityOptionsRequest{ - Namespace: batchParams.Namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: batchParams.Namespace, + Execution: execution, Activity: &workflowservice.UpdateActivityOptionsRequest_Type{Type: batchParams.UpdateActivitiesOptionsParams.ActivityType}, UpdateMask: &fieldmaskpb.FieldMask{Paths: batchParams.UpdateActivitiesOptionsParams.UpdateMask.Paths}, RestoreOriginal: batchParams.UpdateActivitiesOptionsParams.RestoreOriginal, @@ -737,14 +715,11 @@ func startTaskProcessorProtobuf( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { _, err := frontendClient.SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{ - Namespace: namespace, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, - SignalName: operation.SignalOperation.GetSignal(), - Input: operation.SignalOperation.GetInput(), - Identity: operation.SignalOperation.GetIdentity(), + Namespace: namespace, + WorkflowExecution: execution, + SignalName: operation.SignalOperation.GetSignal(), + Input: operation.SignalOperation.GetInput(), + Identity: operation.SignalOperation.GetIdentity(), }) return err }) @@ -752,21 +727,14 @@ func startTaskProcessorProtobuf( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { _, err := frontendClient.DeleteWorkflowExecution(ctx, &workflowservice.DeleteWorkflowExecutionRequest{ - Namespace: namespace, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: namespace, + WorkflowExecution: execution, }) return err }) case *workflowservice.StartBatchOperationRequest_ResetOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { - workflowExecution := &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - } var eventId int64 var err error //nolint:staticcheck // SA1019: worker versioning v0.31 @@ -776,7 +744,7 @@ func startTaskProcessorProtobuf( // Using ResetOptions // Note: getResetEventIDByOptions may modify workflowExecution.RunId, if reset should be to a prior run //nolint:staticcheck // SA1019: worker versioning v0.31 - eventId, err = getResetEventIDByOptions(ctx, operation.ResetOperation.Options, namespace, workflowExecution, frontendClient, logger) + eventId, err = getResetEventIDByOptions(ctx, operation.ResetOperation.Options, namespace, execution, frontendClient, logger) //nolint:staticcheck // SA1019: worker versioning v0.31 resetReapplyType = operation.ResetOperation.Options.ResetReapplyType //nolint:staticcheck // SA1019: worker versioning v0.31 @@ -784,7 +752,7 @@ func startTaskProcessorProtobuf( } else { // Old fields //nolint:staticcheck // SA1019: worker versioning v0.31 - eventId, err = getResetEventIDByType(ctx, operation.ResetOperation.ResetType, batchOperation.Request.Namespace, workflowExecution, frontendClient, logger) + eventId, err = getResetEventIDByType(ctx, operation.ResetOperation.ResetType, batchOperation.Request.Namespace, execution, frontendClient, logger) //nolint:staticcheck // SA1019: worker versioning v0.31 resetReapplyType = operation.ResetOperation.ResetReapplyType } @@ -793,7 +761,7 @@ func startTaskProcessorProtobuf( } _, err = frontendClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ Namespace: namespace, - WorkflowExecution: workflowExecution, + WorkflowExecution: execution, Reason: batchOperation.Request.Reason, RequestId: uuid.New(), WorkflowTaskFinishEventId: eventId, @@ -808,11 +776,8 @@ func startTaskProcessorProtobuf( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { unpauseRequest := &workflowservice.UnpauseActivityRequest{ - Namespace: namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: namespace, + Execution: execution, Identity: operation.UnpauseActivitiesOperation.Identity, ResetAttempts: operation.UnpauseActivitiesOperation.ResetAttempts, ResetHeartbeat: operation.UnpauseActivitiesOperation.ResetHeartbeat, From c1910cb3c557c8b4b94a4c07a6925649d235e1c1 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Mon, 4 Aug 2025 17:03:50 -0600 Subject: [PATCH 05/13] remove workflow type filter --- service/frontend/workflow_handler.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 3d82cd1a25a..8871f1b96cd 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4815,11 +4815,7 @@ func (wh *WorkflowHandler) ListBatchOperations( Namespace: request.GetNamespace(), PageSize: request.PageSize, NextPageToken: request.GetNextPageToken(), - Query: fmt.Sprintf("(%s = '%s' or %s = '%s') and %s = '%s'", - searchattribute.WorkflowType, - batcher.BatchWFTypeProtobufName, - searchattribute.WorkflowType, - batcher.BatchWFTypeName, + Query: fmt.Sprintf("%s = '%s'", searchattribute.TemporalNamespaceDivision, batcher.NamespaceDivision, ), From 1e9fbf5e3c5eb9e622fcfc4b2fdf7d394b05dc34 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 5 Aug 2025 10:24:48 -0600 Subject: [PATCH 06/13] use an event loop and put heartbeat details into its own goroutine --- service/worker/batcher/activities.go | 307 ++++++++++++++++----------- 1 file changed, 185 insertions(+), 122 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index ac7146b5d56..475ad83e06e 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "slices" + "sync" "time" "github.com/pborman/uuid" @@ -63,6 +64,91 @@ type batchWorkerProcessor func( logger log.Logger, ) +// Task represents a task to be processed (alias for taskDetail for consistency with pseudocode) +type Task = taskDetail + +// Page represents a page of workflow executions to be processed +type Page struct { + executions []*commonpb.WorkflowExecution + submittedCount int + successCount int + errorCount int + nextPageToken []byte + pageNumber int + prev, next *Page +} + +// hasNext returns true if there are more pages to fetch +func (p *Page) hasNext() bool { + return len(p.nextPageToken) > 0 +} + +// allSubmitted returns true if all executions in this page have been submitted +func (p *Page) allSubmitted() bool { + return p.submittedCount == len(p.executions) +} + +// nextTask returns the next task to be submitted from this page +func (p *Page) nextTask(hbd HeartBeatDetails) Task { + if p.submittedCount >= len(p.executions) { + return Task{} // No more tasks in this page + } + + task := Task{ + execution: p.executions[p.submittedCount], + attempts: 1, + hbd: hbd, + pageNumber: p.pageNumber, + } + return task +} + +// done returns true if this page and all previous pages are complete +func (p *Page) done() bool { + if p.prev != nil && !p.prev.done() { + return false + } + return p.successCount+p.errorCount == len(p.executions) +} + +// fetchPage fetches a new page of workflow executions +func fetchPage( + ctx context.Context, + sdkClient sdkclient.Client, + config batchProcessorConfig, + pageToken []byte, + pageNumber int, +) (*Page, error) { + if len(config.adjustedQuery) == 0 { + // No query provided, return empty page + return &Page{ + executions: []*commonpb.WorkflowExecution{}, + nextPageToken: []byte{}, + pageNumber: pageNumber, + }, nil + } + + resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + PageSize: int32(pageSize), + NextPageToken: pageToken, + Query: config.adjustedQuery, + }) + if err != nil { + return nil, err + } + + executions := make([]*commonpb.WorkflowExecution, 0, len(resp.Executions)) + for _, wf := range resp.Executions { + executions = append(executions, wf.Execution) + } + + return &Page{ + executions: executions, + nextPageToken: resp.NextPageToken, + pageNumber: pageNumber, + }, nil +} + // processWorkflowsWithProactiveFetching handles the core logic for both batch activity functions // nolint:revive,cognitive-complexity func (a *activities) processWorkflowsWithProactiveFetching( @@ -77,147 +163,119 @@ func (a *activities) processWorkflowsWithProactiveFetching( rateLimit := rate.Limit(config.rps) burstLimit := int(math.Ceil(config.rps)) // should never be zero because everything would be rejected rateLimiter := rate.NewLimiter(rateLimit, burstLimit) - taskCh := make(chan taskDetail, pageSize) - respCh := make(chan taskResponse, pageSize) + + concurrency := int(math.Max(1, float64(config.concurrency))) + + taskCh := make(chan taskDetail, concurrency) + respCh := make(chan taskResponse, concurrency) // Start worker processors for range config.concurrency { go startWorkerProcessor(ctx, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) } - // Track worker state for proactive page fetching - activeWorkers := 0 - totalTasksSent := 0 - totalTasksCompleted := 0 - currentPageToken := config.initialPageToken - hasMorePages := true - - // Track pending tasks per page for proper heartbeating - currentPageNumber := hbd.CurrentPage - pendingTasksPerPage := make(map[int]int) - - // Initial page processing - executions := config.initialExecutions - if len(config.adjustedQuery) > 0 { - resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ - PageSize: int32(pageSize), - NextPageToken: currentPageToken, - Query: config.adjustedQuery, - }) + // Initialize the first page from initial executions or fetch from query + var page *Page + if len(config.initialExecutions) > 0 { + // Use initial executions + page = &Page{ + executions: config.initialExecutions, + nextPageToken: config.initialPageToken, + pageNumber: hbd.CurrentPage, + } + } else { + // Fetch page of executions if needed + var err error + page, err = fetchPage(ctx, sdkClient, config, config.initialPageToken, hbd.CurrentPage) if err != nil { metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to list workflow executions", tag.Error(err)) + logger.Error("Failed to fetch initial page", tag.Error(err)) return HeartBeatDetails{}, err } - currentPageToken = resp.NextPageToken - hasMorePages = len(currentPageToken) > 0 - for _, wf := range resp.Executions { - executions = append(executions, wf.Execution) - } - } else { - hasMorePages = false - } - - // Send initial tasks - for _, wf := range executions { - if activeWorkers >= config.concurrency { - break - } - taskCh <- taskDetail{ - execution: wf, - attempts: 1, - hbd: hbd, - pageNumber: currentPageNumber, - } - activeWorkers++ - totalTasksSent++ - pendingTasksPerPage[currentPageNumber]++ } - // Track remaining executions from current page - executionIndex := activeWorkers + // Thread-safe access to heartbeat details for concurrent updates + var hbdMutex sync.RWMutex - // Main processing loop with proactive page fetching - for totalTasksCompleted < totalTasksSent || hasMorePages { - select { - case taskResult := <-respCh: - activeWorkers-- - totalTasksCompleted++ - - if taskResult.err == nil { - hbd.SuccessCount++ - } else { - hbd.ErrorCount++ - } - - // Track page completion - pageNum := taskResult.pageNumber - pendingTasksPerPage[pageNum]-- - if pendingTasksPerPage[pageNum] == 0 { - // Page is fully completed, record heartbeat - delete(pendingTasksPerPage, pageNum) - activity.RecordHeartbeat(ctx, hbd) + // New event loop using Page-based pager + for { + // Check if we need to fetch next page + if page.hasNext() && page.allSubmitted() { + nextPage, err := fetchPage(ctx, sdkClient, config, page.nextPageToken, page.pageNumber+1) + if err != nil { + metrics.BatcherOperationFailures.With(metricsHandler).Record(1) + logger.Error("Failed to fetch next page", tag.Error(err)) + return HeartBeatDetails{}, err } + // Link pages + page.next = nextPage + nextPage.prev = page + page = nextPage + + // Update heartbeat details for new page (thread-safe) + hbdMutex.Lock() + hbd.CurrentPage = page.pageNumber + hbd.PageToken = page.nextPageToken + hbdMutex.Unlock() + } - // Try to send more tasks from current page or fetch next page - tasksSentInIteration := 0 - - // First, send remaining tasks from current page - for executionIndex < len(executions) && activeWorkers < config.concurrency { - taskCh <- taskDetail{ - execution: executions[executionIndex], - attempts: 1, - hbd: hbd, - pageNumber: currentPageNumber, - } - activeWorkers++ - totalTasksSent++ - executionIndex++ - tasksSentInIteration++ - pendingTasksPerPage[currentPageNumber]++ + select { + case taskCh <- func() Task { + hbdMutex.RLock() + task := page.nextTask(hbd) + hbdMutex.RUnlock() + return task + }(): + // Successfully submitted a task + page.submittedCount++ + + case result := <-respCh: + // Handle task completion result + resultPage := page + + // Find the page that this result belongs to by walking backwards + for resultPage != nil && resultPage.pageNumber != result.pageNumber { + resultPage = resultPage.prev } - // If current page is exhausted and we have capacity, fetch next page - pageExhausted := executionIndex >= len(executions) - if pageExhausted && hasMorePages && activeWorkers < config.concurrency { - resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ - PageSize: int32(pageSize), - NextPageToken: currentPageToken, - Query: config.adjustedQuery, - }) - if err != nil { - metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to list workflow executions", tag.Error(err)) - return HeartBeatDetails{}, err - } - - // Update page state - hbd.CurrentPage++ - hbd.PageToken = resp.NextPageToken - currentPageToken = resp.NextPageToken - hasMorePages = len(currentPageToken) > 0 - currentPageNumber = hbd.CurrentPage - - // Reset executions for new page - executions = nil - for _, wf := range resp.Executions { - executions = append(executions, wf.Execution) + if resultPage != nil { + // Update counts (thread-safe) + hbdMutex.Lock() + if result.err == nil { + resultPage.successCount++ + hbd.SuccessCount++ + } else { + resultPage.errorCount++ + hbd.ErrorCount++ } - executionIndex = 0 - - // Send tasks from new page - for executionIndex < len(executions) && activeWorkers < config.concurrency { - taskCh <- taskDetail{ - execution: executions[executionIndex], - attempts: 1, - hbd: hbd, - pageNumber: currentPageNumber, + hbdMutex.Unlock() + + // Update heartbeat details if this page and all previous pages are complete + if resultPage.successCount+resultPage.errorCount == len(resultPage.executions) { + allPrevComplete := true + for curr := resultPage.prev; curr != nil; curr = curr.prev { + if curr.successCount+curr.errorCount < len(curr.executions) { + allPrevComplete = false + break + } + } + if allPrevComplete { + // Send immediate heartbeat on page completion (original behavior) + hbdMutex.RLock() + activity.RecordHeartbeat(ctx, hbd) + hbdMutex.RUnlock() + + // Delete all previous page pointers for completed pages to avoid scanning repeatedly + for curr := resultPage.prev; curr != nil; { + prev := curr.prev + curr.prev = nil + if prev != nil { + prev.next = nil + } + curr = prev + } + resultPage.prev = nil } - activeWorkers++ - totalTasksSent++ - executionIndex++ - tasksSentInIteration++ - pendingTasksPerPage[currentPageNumber]++ } } @@ -226,6 +284,11 @@ func (a *activities) processWorkflowsWithProactiveFetching( logger.Error("Failed to complete batch operation", tag.Error(ctx.Err())) return HeartBeatDetails{}, ctx.Err() } + + // Check if we're done + if page.done() && !page.hasNext() { + break + } } return hbd, nil From b69b82f44317e0cd5af9c5b36fc94b363797c937 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 5 Aug 2025 12:36:28 -0600 Subject: [PATCH 07/13] workflow execution passed correctly --- service/worker/batcher/activities.go | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index 475ad83e06e..e5991e437ac 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -867,11 +867,8 @@ func startTaskProcessorProtobuf( func(execution *commonpb.WorkflowExecution) error { var err error _, err = frontendClient.UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ - Namespace: namespace, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: namespace, + WorkflowExecution: execution, WorkflowExecutionOptions: operation.UpdateWorkflowOptionsOperation.WorkflowExecutionOptions, UpdateMask: &fieldmaskpb.FieldMask{Paths: operation.UpdateWorkflowOptionsOperation.UpdateMask.Paths}, }) @@ -881,11 +878,8 @@ func startTaskProcessorProtobuf( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { resetRequest := &workflowservice.ResetActivityRequest{ - Namespace: namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: namespace, + Execution: execution, Identity: operation.ResetActivitiesOperation.Identity, ResetHeartbeat: operation.ResetActivitiesOperation.ResetHeartbeat, Jitter: operation.ResetActivitiesOperation.Jitter, @@ -909,11 +903,8 @@ func startTaskProcessorProtobuf( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { updateRequest := &workflowservice.UpdateActivityOptionsRequest{ - Namespace: namespace, - Execution: &commonpb.WorkflowExecution{ - WorkflowId: execution.WorkflowId, - RunId: execution.RunId, - }, + Namespace: namespace, + Execution: execution, UpdateMask: &fieldmaskpb.FieldMask{Paths: operation.UpdateActivityOptionsOperation.UpdateMask.Paths}, RestoreOriginal: operation.UpdateActivityOptionsOperation.RestoreOriginal, Identity: operation.UpdateActivityOptionsOperation.Identity, From f92fba46c32def1ba6b68f6b2546f87fbc0860f4 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 5 Aug 2025 13:06:51 -0600 Subject: [PATCH 08/13] remove heartbeatdetails from task, unexport page, reference page in task type --- service/worker/batcher/activities.go | 142 +++++++++++++-------------- service/worker/batcher/workflow.go | 10 +- 2 files changed, 73 insertions(+), 79 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index e5991e437ac..0dee5a806c9 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -62,49 +62,46 @@ type batchWorkerProcessor func( frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, + getHeartbeatDetails func() HeartBeatDetails, ) -// Task represents a task to be processed (alias for taskDetail for consistency with pseudocode) -type Task = taskDetail - -// Page represents a page of workflow executions to be processed -type Page struct { +// page represents a page of workflow executions to be processed +type page struct { executions []*commonpb.WorkflowExecution submittedCount int successCount int errorCount int nextPageToken []byte pageNumber int - prev, next *Page + prev, next *page } // hasNext returns true if there are more pages to fetch -func (p *Page) hasNext() bool { +func (p *page) hasNext() bool { return len(p.nextPageToken) > 0 } // allSubmitted returns true if all executions in this page have been submitted -func (p *Page) allSubmitted() bool { +func (p *page) allSubmitted() bool { return p.submittedCount == len(p.executions) } // nextTask returns the next task to be submitted from this page -func (p *Page) nextTask(hbd HeartBeatDetails) Task { +func (p *page) nextTask() taskDetail { if p.submittedCount >= len(p.executions) { - return Task{} // No more tasks in this page + return taskDetail{} // No more tasks in this page } - task := Task{ - execution: p.executions[p.submittedCount], - attempts: 1, - hbd: hbd, - pageNumber: p.pageNumber, + task := taskDetail{ + execution: p.executions[p.submittedCount], + attempts: 1, + page: p, } return task } // done returns true if this page and all previous pages are complete -func (p *Page) done() bool { +func (p *page) done() bool { if p.prev != nil && !p.prev.done() { return false } @@ -118,10 +115,10 @@ func fetchPage( config batchProcessorConfig, pageToken []byte, pageNumber int, -) (*Page, error) { +) (*page, error) { if len(config.adjustedQuery) == 0 { // No query provided, return empty page - return &Page{ + return &page{ executions: []*commonpb.WorkflowExecution{}, nextPageToken: []byte{}, pageNumber: pageNumber, @@ -142,7 +139,7 @@ func fetchPage( executions = append(executions, wf.Execution) } - return &Page{ + return &page{ executions: executions, nextPageToken: resp.NextPageToken, pageNumber: pageNumber, @@ -169,16 +166,23 @@ func (a *activities) processWorkflowsWithProactiveFetching( taskCh := make(chan taskDetail, concurrency) respCh := make(chan taskResponse, concurrency) + // Thread-safe access to heartbeat details for concurrent updates + var hbdMutex sync.RWMutex + // Start worker processors for range config.concurrency { - go startWorkerProcessor(ctx, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) + go startWorkerProcessor(ctx, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger, func() HeartBeatDetails { + hbdMutex.RLock() + defer hbdMutex.RUnlock() + return hbd + }) } - // Initialize the first page from initial executions or fetch from query - var page *Page + // Initialize the first p from initial executions or fetch from query + var p *page if len(config.initialExecutions) > 0 { // Use initial executions - page = &Page{ + p = &page{ executions: config.initialExecutions, nextPageToken: config.initialPageToken, pageNumber: hbd.CurrentPage, @@ -186,7 +190,7 @@ func (a *activities) processWorkflowsWithProactiveFetching( } else { // Fetch page of executions if needed var err error - page, err = fetchPage(ctx, sdkClient, config, config.initialPageToken, hbd.CurrentPage) + p, err = fetchPage(ctx, sdkClient, config, config.initialPageToken, hbd.CurrentPage) if err != nil { metrics.BatcherOperationFailures.With(metricsHandler).Record(1) logger.Error("Failed to fetch initial page", tag.Error(err)) @@ -194,49 +198,36 @@ func (a *activities) processWorkflowsWithProactiveFetching( } } - // Thread-safe access to heartbeat details for concurrent updates - var hbdMutex sync.RWMutex - // New event loop using Page-based pager for { // Check if we need to fetch next page - if page.hasNext() && page.allSubmitted() { - nextPage, err := fetchPage(ctx, sdkClient, config, page.nextPageToken, page.pageNumber+1) + if p.hasNext() && p.allSubmitted() { + nextPage, err := fetchPage(ctx, sdkClient, config, p.nextPageToken, p.pageNumber+1) if err != nil { metrics.BatcherOperationFailures.With(metricsHandler).Record(1) logger.Error("Failed to fetch next page", tag.Error(err)) return HeartBeatDetails{}, err } // Link pages - page.next = nextPage - nextPage.prev = page - page = nextPage + p.next = nextPage + nextPage.prev = p + p = nextPage // Update heartbeat details for new page (thread-safe) hbdMutex.Lock() - hbd.CurrentPage = page.pageNumber - hbd.PageToken = page.nextPageToken + hbd.CurrentPage = p.pageNumber + hbd.PageToken = p.nextPageToken hbdMutex.Unlock() } select { - case taskCh <- func() Task { - hbdMutex.RLock() - task := page.nextTask(hbd) - hbdMutex.RUnlock() - return task - }(): + case taskCh <- p.nextTask(): // Successfully submitted a task - page.submittedCount++ + p.submittedCount++ case result := <-respCh: // Handle task completion result - resultPage := page - - // Find the page that this result belongs to by walking backwards - for resultPage != nil && resultPage.pageNumber != result.pageNumber { - resultPage = resultPage.prev - } + resultPage := result.page if resultPage != nil { // Update counts (thread-safe) @@ -286,7 +277,7 @@ func (a *activities) processWorkflowsWithProactiveFetching( } // Check if we're done - if page.done() && !page.hasNext() { + if p.done() && !p.hasNext() { break } } @@ -403,8 +394,9 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams) frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, + getHeartbeatDetails func() HeartBeatDetails, ) { - startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) + startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger, getHeartbeatDetails) } return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) @@ -474,8 +466,9 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, + getHeartbeatDetails func() HeartBeatDetails, ) { - startTaskProcessorProtobuf(ctx, batchParams, batchParams.Request.Namespace, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger) + startTaskProcessorProtobuf(ctx, batchParams, batchParams.Request.Namespace, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger, getHeartbeatDetails) } return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) @@ -544,6 +537,7 @@ func startTaskProcessor( frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, + getHeartbeatDetails func() HeartBeatDetails, ) { for { select { @@ -560,12 +554,12 @@ func startTaskProcessor( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { return sdkClient.TerminateWorkflow(ctx, execution.WorkflowId, execution.RunId, batchParams.Reason) - }) + }, getHeartbeatDetails()) case BatchTypeCancel: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { return sdkClient.CancelWorkflow(ctx, execution.WorkflowId, execution.RunId) - }) + }, getHeartbeatDetails()) case BatchTypeSignal: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -576,7 +570,7 @@ func startTaskProcessor( Input: batchParams.SignalParams.Input, }) return err - }) + }, getHeartbeatDetails()) case BatchTypeDelete: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -585,7 +579,7 @@ func startTaskProcessor( WorkflowExecution: execution, }) return err - }) + }, getHeartbeatDetails()) case BatchTypeReset: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -618,7 +612,7 @@ func startTaskProcessor( PostResetOperations: batchParams.ResetParams.postResetOperations, }) return err - }) + }, getHeartbeatDetails()) case BatchTypeUnpauseActivities: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -639,7 +633,7 @@ func startTaskProcessor( } _, err = frontendClient.UnpauseActivity(ctx, unpauseRequest) return err - }) + }, getHeartbeatDetails()) case BatchTypeUpdateOptions: err = processTask(ctx, limiter, task, @@ -652,7 +646,7 @@ func startTaskProcessor( UpdateMask: &fieldmaskpb.FieldMask{Paths: batchParams.UpdateOptionsParams.UpdateMask.Paths}, }) return err - }) + }, getHeartbeatDetails()) case BatchTypeResetActivities: err = processTask(ctx, limiter, task, @@ -676,7 +670,7 @@ func startTaskProcessor( _, err = frontendClient.ResetActivity(ctx, resetRequest) return err - }) + }, getHeartbeatDetails()) case BatchTypeUpdateActivitiesOptions: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -716,7 +710,7 @@ func startTaskProcessor( _, err = frontendClient.UpdateActivityOptions(ctx, updateRequest) return err - }) + }, getHeartbeatDetails()) default: err = errors.New("unknown batch type: " + batchParams.BatchType) } @@ -726,7 +720,7 @@ func startTaskProcessor( _, ok := batchParams._nonRetryableErrors[err.Error()] if ok || task.attempts > batchParams.AttemptsOnRetryableError { - respCh <- taskResponse{err: err, pageNumber: task.pageNumber} + respCh <- taskResponse{err: err, page: task.page} } else { // put back to the channel if less than attemptsOnError task.attempts++ @@ -734,7 +728,7 @@ func startTaskProcessor( } } else { metrics.BatcherProcessorSuccess.With(metricsHandler).Record(1) - respCh <- taskResponse{err: nil, pageNumber: task.pageNumber} + respCh <- taskResponse{err: nil, page: task.page} } } } @@ -752,6 +746,7 @@ func startTaskProcessorProtobuf( frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, + getHeartbeatDetails func() HeartBeatDetails, ) { for { select { @@ -768,12 +763,12 @@ func startTaskProcessorProtobuf( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { return sdkClient.TerminateWorkflow(ctx, execution.WorkflowId, execution.RunId, batchOperation.Request.Reason) - }) + }, getHeartbeatDetails()) case *workflowservice.StartBatchOperationRequest_CancellationOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { return sdkClient.CancelWorkflow(ctx, execution.WorkflowId, execution.RunId) - }) + }, getHeartbeatDetails()) case *workflowservice.StartBatchOperationRequest_SignalOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -785,7 +780,7 @@ func startTaskProcessorProtobuf( Identity: operation.SignalOperation.GetIdentity(), }) return err - }) + }, getHeartbeatDetails()) case *workflowservice.StartBatchOperationRequest_DeletionOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -794,7 +789,7 @@ func startTaskProcessorProtobuf( WorkflowExecution: execution, }) return err - }) + }, getHeartbeatDetails()) case *workflowservice.StartBatchOperationRequest_ResetOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -834,7 +829,7 @@ func startTaskProcessorProtobuf( Identity: operation.ResetOperation.Identity, }) return err - }) + }, getHeartbeatDetails()) case *workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -860,7 +855,7 @@ func startTaskProcessorProtobuf( _, err = frontendClient.UnpauseActivity(ctx, unpauseRequest) return err - }) + }, getHeartbeatDetails()) case *workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation: err = processTask(ctx, limiter, task, @@ -873,7 +868,7 @@ func startTaskProcessorProtobuf( UpdateMask: &fieldmaskpb.FieldMask{Paths: operation.UpdateWorkflowOptionsOperation.UpdateMask.Paths}, }) return err - }) + }, getHeartbeatDetails()) case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -898,7 +893,7 @@ func startTaskProcessorProtobuf( _, err = frontendClient.ResetActivity(ctx, resetRequest) return err - }) + }, getHeartbeatDetails()) case *workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -922,7 +917,7 @@ func startTaskProcessorProtobuf( updateRequest.ActivityOptions = operation.UpdateActivityOptionsOperation.GetActivityOptions() _, err = frontendClient.UpdateActivityOptions(ctx, updateRequest) return err - }) + }, getHeartbeatDetails()) default: err = errors.New(fmt.Sprintf("unknown batch type: %v", batchOperation.BatchType)) } @@ -931,7 +926,7 @@ func startTaskProcessorProtobuf( logger.Error("Failed to process batch operation task", tag.Error(err)) nonRetryable := slices.Contains(batchOperation.NonRetryableErrors, err.Error()) if nonRetryable || task.attempts > int(batchOperation.AttemptsOnRetryableError) { - respCh <- taskResponse{err: err, pageNumber: task.pageNumber} + respCh <- taskResponse{err: err, page: task.page} } else { // put back to the channel if less than attemptsOnError task.attempts++ @@ -939,7 +934,7 @@ func startTaskProcessorProtobuf( } } else { metrics.BatcherProcessorSuccess.With(metricsHandler).Record(1) - respCh <- taskResponse{err: nil, pageNumber: task.pageNumber} + respCh <- taskResponse{err: nil, page: task.page} } } } @@ -950,13 +945,14 @@ func processTask( limiter *rate.Limiter, task taskDetail, procFn func(*commonpb.WorkflowExecution) error, + hbd HeartBeatDetails, ) error { err := limiter.Wait(ctx) if err != nil { return err } - activity.RecordHeartbeat(ctx, task.hbd) + activity.RecordHeartbeat(ctx, hbd) err = procFn(task.execution) if err != nil { diff --git a/service/worker/batcher/workflow.go b/service/worker/batcher/workflow.go index 5051321bc3b..57924fa031c 100644 --- a/service/worker/batcher/workflow.go +++ b/service/worker/batcher/workflow.go @@ -226,17 +226,15 @@ type ( execution *commonpb.WorkflowExecution // the number of attempts to process the workflow execution attempts int - // passing along the current heartbeat details to make heartbeat within a task so that it won't timeout - hbd HeartBeatDetails - // the page number this task belongs to (for tracking page completion) - pageNumber int + // reference to the page this task belongs to (for tracking page completion) + page *page } taskResponse struct { // the error result from processing the task (nil for success) err error - // the page number the completed task belonged to - pageNumber int + // reference to the page the completed task belonged to + page *page } ) From 03d7295fd188efa8f94cc1186bae4ecc822770e0 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 5 Aug 2025 14:12:46 -0600 Subject: [PATCH 09/13] rename --- service/worker/batcher/activities.go | 20 ++++++++++---------- service/worker/batcher/workflow.go | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index 0dee5a806c9..f376db5062b 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -55,7 +55,7 @@ type batchProcessorConfig struct { // batchWorkerProcessor defines the interface for different worker processor types type batchWorkerProcessor func( ctx context.Context, - taskCh chan taskDetail, + taskCh chan task, respCh chan taskResponse, rateLimiter *rate.Limiter, sdkClient sdkclient.Client, @@ -87,12 +87,12 @@ func (p *page) allSubmitted() bool { } // nextTask returns the next task to be submitted from this page -func (p *page) nextTask() taskDetail { +func (p *page) nextTask() task { if p.submittedCount >= len(p.executions) { - return taskDetail{} // No more tasks in this page + return task{} // No more tasks in this page } - task := taskDetail{ + task := task{ execution: p.executions[p.submittedCount], attempts: 1, page: p, @@ -163,7 +163,7 @@ func (a *activities) processWorkflowsWithProactiveFetching( concurrency := int(math.Max(1, float64(config.concurrency))) - taskCh := make(chan taskDetail, concurrency) + taskCh := make(chan task, concurrency) respCh := make(chan taskResponse, concurrency) // Thread-safe access to heartbeat details for concurrent updates @@ -387,7 +387,7 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams) // Create a wrapper for the batch params specific worker processor workerProcessor := func( ctx context.Context, - taskCh chan taskDetail, + taskCh chan task, respCh chan taskResponse, rateLimiter *rate.Limiter, sdkClient sdkclient.Client, @@ -459,7 +459,7 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams // Create a wrapper for the protobuf specific worker processor workerProcessor := func( ctx context.Context, - taskCh chan taskDetail, + taskCh chan task, respCh chan taskResponse, rateLimiter *rate.Limiter, sdkClient sdkclient.Client, @@ -530,7 +530,7 @@ func (a *activities) getOperationConcurrency(concurrency int) int { func startTaskProcessor( ctx context.Context, batchParams BatchParams, - taskCh chan taskDetail, + taskCh chan task, respCh chan taskResponse, limiter *rate.Limiter, sdkClient sdkclient.Client, @@ -739,7 +739,7 @@ func startTaskProcessorProtobuf( ctx context.Context, batchOperation *batchspb.BatchOperationInput, namespace string, - taskCh chan taskDetail, + taskCh chan task, respCh chan taskResponse, limiter *rate.Limiter, sdkClient sdkclient.Client, @@ -943,7 +943,7 @@ func startTaskProcessorProtobuf( func processTask( ctx context.Context, limiter *rate.Limiter, - task taskDetail, + task task, procFn func(*commonpb.WorkflowExecution) error, hbd HeartBeatDetails, ) error { diff --git a/service/worker/batcher/workflow.go b/service/worker/batcher/workflow.go index 57924fa031c..e320d99e645 100644 --- a/service/worker/batcher/workflow.go +++ b/service/worker/batcher/workflow.go @@ -221,7 +221,7 @@ type ( ErrorCount int } - taskDetail struct { + task struct { // the workflow execution to process execution *commonpb.WorkflowExecution // the number of attempts to process the workflow execution From 1211f1e1eaf0804e86efc8cca90ef58a7da16ce5 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 5 Aug 2025 14:17:37 -0600 Subject: [PATCH 10/13] remove comment --- service/worker/batcher/activities.go | 1 - 1 file changed, 1 deletion(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index f376db5062b..8f685a3e64c 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -166,7 +166,6 @@ func (a *activities) processWorkflowsWithProactiveFetching( taskCh := make(chan task, concurrency) respCh := make(chan taskResponse, concurrency) - // Thread-safe access to heartbeat details for concurrent updates var hbdMutex sync.RWMutex // Start worker processors From 6d04029e4a8cbca0273975df3129d27e389f1ab0 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 12 Aug 2025 10:48:47 -0600 Subject: [PATCH 11/13] addressing roeys comments --- service/worker/batcher/activities.go | 132 ++++++++++----------------- 1 file changed, 50 insertions(+), 82 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index 8f685a3e64c..2f47d1ab898 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -6,7 +6,6 @@ import ( "fmt" "math" "slices" - "sync" "time" "github.com/pborman/uuid" @@ -62,7 +61,6 @@ type batchWorkerProcessor func( frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, - getHeartbeatDetails func() HeartBeatDetails, ) // page represents a page of workflow executions to be processed @@ -166,15 +164,13 @@ func (a *activities) processWorkflowsWithProactiveFetching( taskCh := make(chan task, concurrency) respCh := make(chan taskResponse, concurrency) - var hbdMutex sync.RWMutex + // Ticker for frequent heartbeats to avoid timeout during slow processing, 1/4 of the default heartbeat timeout (10s) + heartbeatTicker := time.NewTicker(defaultActivityHeartBeatTimeout / 4) + defer heartbeatTicker.Stop() // Start worker processors for range config.concurrency { - go startWorkerProcessor(ctx, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger, func() HeartBeatDetails { - hbdMutex.RLock() - defer hbdMutex.RUnlock() - return hbd - }) + go startWorkerProcessor(ctx, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) } // Initialize the first p from initial executions or fetch from query @@ -192,83 +188,61 @@ func (a *activities) processWorkflowsWithProactiveFetching( p, err = fetchPage(ctx, sdkClient, config, config.initialPageToken, hbd.CurrentPage) if err != nil { metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to fetch initial page", tag.Error(err)) - return HeartBeatDetails{}, err + return HeartBeatDetails{}, fmt.Errorf("failed to fetch next page: %w", err) } } - // New event loop using Page-based pager for { // Check if we need to fetch next page if p.hasNext() && p.allSubmitted() { nextPage, err := fetchPage(ctx, sdkClient, config, p.nextPageToken, p.pageNumber+1) if err != nil { metrics.BatcherOperationFailures.With(metricsHandler).Record(1) - logger.Error("Failed to fetch next page", tag.Error(err)) - return HeartBeatDetails{}, err + return HeartBeatDetails{}, fmt.Errorf("failed to fetch next page: %w", err) } - // Link pages p.next = nextPage nextPage.prev = p p = nextPage - // Update heartbeat details for new page (thread-safe) - hbdMutex.Lock() hbd.CurrentPage = p.pageNumber hbd.PageToken = p.nextPageToken - hbdMutex.Unlock() } select { case taskCh <- p.nextTask(): - // Successfully submitted a task p.submittedCount++ case result := <-respCh: - // Handle task completion result resultPage := result.page + if result.err == nil { + resultPage.successCount++ + } else { + resultPage.errorCount++ + } - if resultPage != nil { - // Update counts (thread-safe) - hbdMutex.Lock() - if result.err == nil { - resultPage.successCount++ - hbd.SuccessCount++ - } else { - resultPage.errorCount++ - hbd.ErrorCount++ - } - hbdMutex.Unlock() - - // Update heartbeat details if this page and all previous pages are complete - if resultPage.successCount+resultPage.errorCount == len(resultPage.executions) { - allPrevComplete := true - for curr := resultPage.prev; curr != nil; curr = curr.prev { - if curr.successCount+curr.errorCount < len(curr.executions) { - allPrevComplete = false - break - } - } - if allPrevComplete { - // Send immediate heartbeat on page completion (original behavior) - hbdMutex.RLock() - activity.RecordHeartbeat(ctx, hbd) - hbdMutex.RUnlock() - - // Delete all previous page pointers for completed pages to avoid scanning repeatedly - for curr := resultPage.prev; curr != nil; { - prev := curr.prev - curr.prev = nil - if prev != nil { - prev.next = nil - } - curr = prev - } - resultPage.prev = nil + // Update heartbeat details if this page and all previous pages are complete + if resultPage.done() { + allPrevComplete := true + for curr := resultPage.prev; curr != nil; curr = curr.prev { + if !curr.done() { + allPrevComplete = false + break } } + if allPrevComplete { + hbd.SuccessCount += resultPage.successCount + hbd.ErrorCount += resultPage.errorCount + + activity.RecordHeartbeat(ctx, hbd) + + resultPage.prev = nil + } } + case <-heartbeatTicker.C: + // Send periodic heartbeat to prevent timeout during slow processing + activity.RecordHeartbeat(ctx, hbd) + case <-ctx.Done(): metrics.BatcherOperationFailures.With(metricsHandler).Record(1) logger.Error("Failed to complete batch operation", tag.Error(ctx.Err())) @@ -393,9 +367,8 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams) frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, - getHeartbeatDetails func() HeartBeatDetails, ) { - startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger, getHeartbeatDetails) + startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter, sdkClient, a.FrontendClient, metricsHandler, logger) } return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) @@ -465,9 +438,8 @@ func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, - getHeartbeatDetails func() HeartBeatDetails, ) { - startTaskProcessorProtobuf(ctx, batchParams, batchParams.Request.Namespace, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger, getHeartbeatDetails) + startTaskProcessorProtobuf(ctx, batchParams, batchParams.Request.Namespace, taskCh, respCh, rateLimiter, sdkClient, frontendClient, metricsHandler, logger) } return a.processWorkflowsWithProactiveFetching(ctx, config, workerProcessor, sdkClient, metricsHandler, logger, hbd) @@ -536,7 +508,6 @@ func startTaskProcessor( frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, - getHeartbeatDetails func() HeartBeatDetails, ) { for { select { @@ -553,12 +524,12 @@ func startTaskProcessor( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { return sdkClient.TerminateWorkflow(ctx, execution.WorkflowId, execution.RunId, batchParams.Reason) - }, getHeartbeatDetails()) + }) case BatchTypeCancel: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { return sdkClient.CancelWorkflow(ctx, execution.WorkflowId, execution.RunId) - }, getHeartbeatDetails()) + }) case BatchTypeSignal: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -569,7 +540,7 @@ func startTaskProcessor( Input: batchParams.SignalParams.Input, }) return err - }, getHeartbeatDetails()) + }) case BatchTypeDelete: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -578,7 +549,7 @@ func startTaskProcessor( WorkflowExecution: execution, }) return err - }, getHeartbeatDetails()) + }) case BatchTypeReset: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -611,7 +582,7 @@ func startTaskProcessor( PostResetOperations: batchParams.ResetParams.postResetOperations, }) return err - }, getHeartbeatDetails()) + }) case BatchTypeUnpauseActivities: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -632,7 +603,7 @@ func startTaskProcessor( } _, err = frontendClient.UnpauseActivity(ctx, unpauseRequest) return err - }, getHeartbeatDetails()) + }) case BatchTypeUpdateOptions: err = processTask(ctx, limiter, task, @@ -645,7 +616,7 @@ func startTaskProcessor( UpdateMask: &fieldmaskpb.FieldMask{Paths: batchParams.UpdateOptionsParams.UpdateMask.Paths}, }) return err - }, getHeartbeatDetails()) + }) case BatchTypeResetActivities: err = processTask(ctx, limiter, task, @@ -669,7 +640,7 @@ func startTaskProcessor( _, err = frontendClient.ResetActivity(ctx, resetRequest) return err - }, getHeartbeatDetails()) + }) case BatchTypeUpdateActivitiesOptions: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -709,7 +680,7 @@ func startTaskProcessor( _, err = frontendClient.UpdateActivityOptions(ctx, updateRequest) return err - }, getHeartbeatDetails()) + }) default: err = errors.New("unknown batch type: " + batchParams.BatchType) } @@ -745,7 +716,6 @@ func startTaskProcessorProtobuf( frontendClient workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler, logger log.Logger, - getHeartbeatDetails func() HeartBeatDetails, ) { for { select { @@ -762,12 +732,12 @@ func startTaskProcessorProtobuf( err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { return sdkClient.TerminateWorkflow(ctx, execution.WorkflowId, execution.RunId, batchOperation.Request.Reason) - }, getHeartbeatDetails()) + }) case *workflowservice.StartBatchOperationRequest_CancellationOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { return sdkClient.CancelWorkflow(ctx, execution.WorkflowId, execution.RunId) - }, getHeartbeatDetails()) + }) case *workflowservice.StartBatchOperationRequest_SignalOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -779,7 +749,7 @@ func startTaskProcessorProtobuf( Identity: operation.SignalOperation.GetIdentity(), }) return err - }, getHeartbeatDetails()) + }) case *workflowservice.StartBatchOperationRequest_DeletionOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -788,7 +758,7 @@ func startTaskProcessorProtobuf( WorkflowExecution: execution, }) return err - }, getHeartbeatDetails()) + }) case *workflowservice.StartBatchOperationRequest_ResetOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -828,7 +798,7 @@ func startTaskProcessorProtobuf( Identity: operation.ResetOperation.Identity, }) return err - }, getHeartbeatDetails()) + }) case *workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -854,7 +824,7 @@ func startTaskProcessorProtobuf( _, err = frontendClient.UnpauseActivity(ctx, unpauseRequest) return err - }, getHeartbeatDetails()) + }) case *workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation: err = processTask(ctx, limiter, task, @@ -867,7 +837,7 @@ func startTaskProcessorProtobuf( UpdateMask: &fieldmaskpb.FieldMask{Paths: operation.UpdateWorkflowOptionsOperation.UpdateMask.Paths}, }) return err - }, getHeartbeatDetails()) + }) case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -892,7 +862,7 @@ func startTaskProcessorProtobuf( _, err = frontendClient.ResetActivity(ctx, resetRequest) return err - }, getHeartbeatDetails()) + }) case *workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation: err = processTask(ctx, limiter, task, func(execution *commonpb.WorkflowExecution) error { @@ -916,7 +886,7 @@ func startTaskProcessorProtobuf( updateRequest.ActivityOptions = operation.UpdateActivityOptionsOperation.GetActivityOptions() _, err = frontendClient.UpdateActivityOptions(ctx, updateRequest) return err - }, getHeartbeatDetails()) + }) default: err = errors.New(fmt.Sprintf("unknown batch type: %v", batchOperation.BatchType)) } @@ -944,14 +914,12 @@ func processTask( limiter *rate.Limiter, task task, procFn func(*commonpb.WorkflowExecution) error, - hbd HeartBeatDetails, ) error { err := limiter.Wait(ctx) if err != nil { return err } - activity.RecordHeartbeat(ctx, hbd) err = procFn(task.execution) if err != nil { From f6856ab15120723264867abda72516173db47c2c Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Tue, 12 Aug 2025 16:48:20 -0600 Subject: [PATCH 12/13] removing duplication with done method --- service/worker/batcher/activities.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index 2f47d1ab898..99c62a0da02 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -222,21 +222,12 @@ func (a *activities) processWorkflowsWithProactiveFetching( // Update heartbeat details if this page and all previous pages are complete if resultPage.done() { - allPrevComplete := true - for curr := resultPage.prev; curr != nil; curr = curr.prev { - if !curr.done() { - allPrevComplete = false - break - } - } - if allPrevComplete { - hbd.SuccessCount += resultPage.successCount - hbd.ErrorCount += resultPage.errorCount + hbd.SuccessCount += resultPage.successCount + hbd.ErrorCount += resultPage.errorCount - activity.RecordHeartbeat(ctx, hbd) + activity.RecordHeartbeat(ctx, hbd) - resultPage.prev = nil - } + resultPage.prev = nil } case <-heartbeatTicker.C: From f379f5f8458cb038567b6fcaad7ed7f50840d168 Mon Sep 17 00:00:00 2001 From: Sean Kane Date: Wed, 13 Aug 2025 15:40:12 -0600 Subject: [PATCH 13/13] Update service/worker/batcher/activities.go Co-authored-by: Roey Berman --- service/worker/batcher/activities.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/service/worker/batcher/activities.go b/service/worker/batcher/activities.go index 99c62a0da02..683dca73c89 100644 --- a/service/worker/batcher/activities.go +++ b/service/worker/batcher/activities.go @@ -221,13 +221,11 @@ func (a *activities) processWorkflowsWithProactiveFetching( } // Update heartbeat details if this page and all previous pages are complete - if resultPage.done() { - hbd.SuccessCount += resultPage.successCount - hbd.ErrorCount += resultPage.errorCount - - activity.RecordHeartbeat(ctx, hbd) - - resultPage.prev = nil + // Find all pages from the current one on that are done, record their stats, and unlink them. + for page := resultPage; page != nil && page.done(); page = page.next { + hbd.SuccessCount += page.successCount + hbd.ErrorCount += page.errorCount + page.prev = nil } case <-heartbeatTicker.C: