Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ require (
go.opentelemetry.io/otel/sdk v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.temporal.io/api v1.50.1-0.20250715164317-6157f960f13b
go.temporal.io/api v1.50.1-0.20250717230205-2e8ba1309322
go.temporal.io/sdk v1.34.0
go.uber.org/automaxprocs v1.6.0
go.uber.org/fx v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.temporal.io/api v1.50.1-0.20250715164317-6157f960f13b h1:LYi/B1Pewj36fKKMWES4k1UmK0m/bYZwCgR/yFidwdc=
go.temporal.io/api v1.50.1-0.20250715164317-6157f960f13b/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.50.1-0.20250717230205-2e8ba1309322 h1:8Fqq4jBBFTul+nTFr2DXulYLNjMO1YH2i5+pJnM9LI0=
go.temporal.io/api v1.50.1-0.20250717230205-2e8ba1309322/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
95 changes: 81 additions & 14 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4505,6 +4505,8 @@ func (wh *WorkflowHandler) StartBatchOperation(
var resetParams batcher.ResetParams
var updateOptionsParams batcher.UpdateOptionsParams
var unpauseActivitiesParams batcher.UnpauseActivitiesParams
var resetActivitiesParams batcher.ResetActivitiesParams
var updateActivitiesOptionsParams batcher.UpdateActivitiesOptionsParams
switch op := request.Operation.(type) {
case *workflowservice.StartBatchOperationRequest_TerminationOperation:
identity = op.TerminationOperation.GetIdentity()
Expand Down Expand Up @@ -4583,7 +4585,7 @@ func (wh *WorkflowHandler) StartBatchOperation(
visibilityQuery = fmt.Sprintf("(%s) AND (%s)", visibilityQuery, unpauseCause)
unpauseActivitiesParams.ActivityType = a.Type
case *batchpb.BatchOperationUnpauseActivities_MatchAll:
if a.MatchAll == false {
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
wildCardUnpause := fmt.Sprintf("%s STARTS_WITH 'property:activityType='", searchattribute.TemporalPauseInfo)
Expand All @@ -4594,24 +4596,83 @@ func (wh *WorkflowHandler) StartBatchOperation(
unpauseActivitiesParams.ResetAttempts = op.UnpauseActivitiesOperation.ResetAttempts
unpauseActivitiesParams.ResetHeartbeat = op.UnpauseActivitiesOperation.ResetHeartbeat
unpauseActivitiesParams.Jitter = op.UnpauseActivitiesOperation.Jitter.AsDuration()
case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation:
operationType = batcher.BatchTypeResetActivities
if op.ResetActivitiesOperation == nil {
return nil, serviceerror.NewInvalidArgument("reset activities operation is not set")
}
if op.ResetActivitiesOperation.GetActivity() == nil {
return nil, serviceerror.NewInvalidArgument("activity filter must be set")
}

switch a := op.ResetActivitiesOperation.GetActivity().(type) {
case *batchpb.BatchOperationResetActivities_Type:
if len(a.Type) == 0 {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
resetActivitiesParams.ActivityType = a.Type
case *batchpb.BatchOperationResetActivities_MatchAll:
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
resetActivitiesParams.MatchAll = true
}

resetActivitiesParams.ResetAttempts = op.ResetActivitiesOperation.ResetAttempts
resetActivitiesParams.ResetHeartbeat = op.ResetActivitiesOperation.ResetHeartbeat
resetActivitiesParams.Jitter = op.ResetActivitiesOperation.Jitter.AsDuration()
resetActivitiesParams.KeepPaused = op.ResetActivitiesOperation.KeepPaused
resetActivitiesParams.RestoreOriginalOptions = op.ResetActivitiesOperation.RestoreOriginalOptions
resetActivitiesParams.Identity = op.ResetActivitiesOperation.GetIdentity()
case *workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation:
operationType = batcher.BatchTypeUpdateActivitiesOptions
if op.UpdateActivityOptionsOperation == nil {
return nil, serviceerror.NewInvalidArgument("update activity options operation is not set")
}
if op.UpdateActivityOptionsOperation.GetActivityOptions() != nil && op.UpdateActivityOptionsOperation.GetRestoreOriginal() {
return nil, serviceerror.NewInvalidArgument("cannot set both activity options and restore original")
}
if op.UpdateActivityOptionsOperation.GetActivityOptions() == nil && !op.UpdateActivityOptionsOperation.GetRestoreOriginal() {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or restore original should be set to true")
}

switch a := op.UpdateActivityOptionsOperation.GetActivity().(type) {
case *batchpb.BatchOperationUpdateActivityOptions_Type:
if len(a.Type) == 0 {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
updateActivitiesOptionsParams.ActivityType = a.Type
case *batchpb.BatchOperationUpdateActivityOptions_MatchAll:
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
updateActivitiesOptionsParams.MatchAll = true
}

updateActivitiesOptionsParams.ActivityOptions = op.UpdateActivityOptionsOperation.GetActivityOptions()
updateActivitiesOptionsParams.UpdateMask = op.UpdateActivityOptionsOperation.GetUpdateMask()
updateActivitiesOptionsParams.RestoreOriginal = op.UpdateActivityOptionsOperation.GetRestoreOriginal()
updateActivitiesOptionsParams.Identity = op.UpdateActivityOptionsOperation.GetIdentity()
default:
return nil, serviceerror.NewInvalidArgumentf("The operation type %T is not supported", op)
}

input := &batcher.BatchParams{
Namespace: request.GetNamespace(),
Query: visibilityQuery,
Executions: request.GetExecutions(),
Reason: request.GetReason(),
BatchType: operationType,
RPS: float64(request.GetMaxOperationsPerSecond()),
TerminateParams: batcher.TerminateParams{},
CancelParams: batcher.CancelParams{},
SignalParams: signalParams,
DeleteParams: batcher.DeleteParams{},
ResetParams: resetParams,
UpdateOptionsParams: updateOptionsParams,
UnpauseActivitiesParams: unpauseActivitiesParams,
Namespace: request.GetNamespace(),
Query: visibilityQuery,
Executions: request.GetExecutions(),
Reason: request.GetReason(),
BatchType: operationType,
RPS: float64(request.GetMaxOperationsPerSecond()),
TerminateParams: batcher.TerminateParams{},
CancelParams: batcher.CancelParams{},
SignalParams: signalParams,
DeleteParams: batcher.DeleteParams{},
ResetParams: resetParams,
UpdateOptionsParams: updateOptionsParams,
UnpauseActivitiesParams: unpauseActivitiesParams,
ResetActivitiesParams: resetActivitiesParams,
UpdateActivitiesOptionsParams: updateActivitiesOptionsParams,
}
inputPayload, err := sdk.PreferProtoDataConverter.ToPayloads(input)
if err != nil {
Expand Down Expand Up @@ -4777,6 +4838,12 @@ func (wh *WorkflowHandler) DescribeBatchOperation(
operationType = enumspb.BATCH_OPERATION_TYPE_RESET
case batcher.BatchTypeUpdateOptions:
operationType = enumspb.BATCH_OPERATION_TYPE_UPDATE_EXECUTION_OPTIONS
case batcher.BatchTypeUpdateActivitiesOptions:
operationType = enumspb.BATCH_OPERATION_TYPE_UPDATE_ACTIVITY_OPTIONS
case batcher.BatchTypeResetActivities:
operationType = enumspb.BATCH_OPERATION_TYPE_RESET_ACTIVITY
case batcher.BatchTypeUnpauseActivities:
operationType = enumspb.BATCH_OPERATION_TYPE_UNPAUSE_ACTIVITY
default:
operationType = enumspb.BATCH_OPERATION_TYPE_UNSPECIFIED
wh.throttledLogger.Warn("Unknown batch operation type", tag.NewStringTag("batch-operation-type", operationTypeString))
Expand Down
53 changes: 53 additions & 0 deletions service/worker/batcher/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,59 @@ func startTaskProcessor(
})
return err
})

case BatchTypeResetActivities:
err = processTask(ctx, limiter, task,
func(workflowID, runID string) error {
resetRequest := &workflowservice.ResetActivityRequest{
Namespace: batchParams.Namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
Identity: batchParams.ResetActivitiesParams.Identity,
Activity: &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ActivityType},
ResetHeartbeat: batchParams.ResetActivitiesParams.ResetHeartbeat,
Jitter: durationpb.New(batchParams.ResetActivitiesParams.Jitter),
KeepPaused: batchParams.ResetActivitiesParams.KeepPaused,
RestoreOriginalOptions: batchParams.ResetActivitiesParams.RestoreOriginalOptions,
}

if batchParams.ResetActivitiesParams.MatchAll {
resetRequest.Activity = &workflowservice.ResetActivityRequest_MatchAll{MatchAll: true}
} else {
resetRequest.Activity = &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ActivityType}
}

_, err = frontendClient.ResetActivity(ctx, resetRequest)
return err
})
case BatchTypeUpdateActivitiesOptions:
err = processTask(ctx, limiter, task,
func(workflowID, runID string) error {
updateRequest := &workflowservice.UpdateActivityOptionsRequest{
Namespace: batchParams.Namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
Activity: &workflowservice.UpdateActivityOptionsRequest_Type{Type: batchParams.UpdateActivitiesOptionsParams.ActivityType},
UpdateMask: batchParams.UpdateActivitiesOptionsParams.UpdateMask,
RestoreOriginal: batchParams.UpdateActivitiesOptionsParams.RestoreOriginal,
Identity: batchParams.UpdateActivitiesOptionsParams.Identity,
ActivityOptions: batchParams.UpdateActivitiesOptionsParams.ActivityOptions,
}

if batchParams.UpdateActivitiesOptionsParams.MatchAll {
updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_MatchAll{MatchAll: true}
} else {
updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_Type{Type: batchParams.UpdateActivitiesOptionsParams.ActivityType}
}

_, err = frontendClient.UpdateActivityOptions(ctx, updateRequest)
return err
})
// QUESTION seankane (2025-07-18): why do we not have a default case and return an error? @yuri/@chetan
}
if err != nil {
metrics.BatcherProcessorFailures.With(metricsHandler).Record(1)
Expand Down
40 changes: 40 additions & 0 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package batcher

import (
"errors"
"fmt"
"time"

activitypb "go.temporal.io/api/activity/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
workflowpb "go.temporal.io/api/workflow/v1"
Expand Down Expand Up @@ -42,6 +44,10 @@ const (
BatchTypeUpdateOptions = "update_options"
// BatchTypePauseActivities is batch type for unpausing activities
BatchTypeUnpauseActivities = "unpause_activities"
// BatchTypeUpdateActivitiesOptions is batch type for updating the options of activities
BatchTypeUpdateActivitiesOptions = "update_activity_options"
// BatchTypeResetActivities is batch type for resetting activities
BatchTypeResetActivities = "reset_activities"
)

var (
Expand Down Expand Up @@ -103,6 +109,26 @@ type (
Jitter time.Duration
}

UpdateActivitiesOptionsParams struct {
Identity string
ActivityType string
MatchAll bool
ActivityOptions *activitypb.ActivityOptions
UpdateMask *fieldmaskpb.FieldMask
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine but you should know that using a nested proto struct in workflow input doesn't use proto JSON serialization, instead it uses plain JSON serialization which may yield unexpected results.

If we were writing this from scratch, the right thing to do here is to define a proto struct for the workflow input and reuse the request structs for each of these actions.
You may want to serialize these proto messages yourself and bypass the SDK's serialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted w/ Yuri on this. We already use the proto struct for FieldMask without custom serialization in other batch commands so we should be fine here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would test that every field in these message are properly serialized.
Generally, what we've done here is bad practice and should be avoided in our codebase. It's not just FieldMask, it's also ActivityOptions. Even if we know that all fields are properly serialized today (which I'm not convinced of), there's no guarantee that future fields will.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: There are known issues with the default protobuf json converter when the message contains oneoff fields - https://protobuf.dev/programming-guides/json/. It's recommended to use protojson to explicitly serialize/deserialize in such cases.
In this case you're fine since UpdateActivitiesOptionsParams doesn't contain any protobufs with oneoff fields. But that may change in the future if someone adds additional fields to this input.

RestoreOriginal bool
}

ResetActivitiesParams struct {
Identity string
ActivityType string
MatchAll bool
ResetAttempts bool
ResetHeartbeat bool
KeepPaused bool
Jitter time.Duration
RestoreOriginalOptions bool
}

// BatchParams is the parameters for batch operation workflow
BatchParams struct {
// Target namespace to execute batch operation
Expand Down Expand Up @@ -131,6 +157,10 @@ type (
UpdateOptionsParams UpdateOptionsParams
// UnpauseActivitiesParams is params only for BatchTypeUnpauseActivities
UnpauseActivitiesParams UnpauseActivitiesParams
// UpdateActivitiesOptionsParams is params only for BatchTypeUpdateActivitiesOptions
UpdateActivitiesOptionsParams UpdateActivitiesOptionsParams
// ResetActivitiesParams is params only for BatchTypeResetActivities
ResetActivitiesParams ResetActivitiesParams

// RPS sets the requests-per-second limit for the batch.
// The default (and max) is defined by `worker.BatcherRPS` in the dynamic config.
Expand Down Expand Up @@ -257,6 +287,16 @@ func validateParams(params BatchParams) error {
return fmt.Errorf("must provide ActivityType or MatchAll")
}
return nil
case BatchTypeResetActivities:
if params.ResetActivitiesParams.ActivityType == "" && !params.ResetActivitiesParams.MatchAll {
return errors.New("must provide ActivityType or MatchAll")
}
return nil
case BatchTypeUpdateActivitiesOptions:
if params.UpdateActivitiesOptionsParams.ActivityType == "" && !params.UpdateActivitiesOptionsParams.MatchAll {
return errors.New("must provide ActivityType or MatchAll")
}
return nil
default:
return fmt.Errorf("not supported batch type: %v", params.BatchType)
}
Expand Down
Loading
Loading