Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ require (
go.opentelemetry.io/otel/sdk v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.temporal.io/api v1.50.1-0.20250715164317-6157f960f13b
go.temporal.io/api v1.50.1-0.20250717214050-28ae23500a33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge the API change before the server PR.

go.temporal.io/sdk v1.34.0
go.uber.org/automaxprocs v1.6.0
go.uber.org/fx v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.temporal.io/api v1.50.1-0.20250715164317-6157f960f13b h1:LYi/B1Pewj36fKKMWES4k1UmK0m/bYZwCgR/yFidwdc=
go.temporal.io/api v1.50.1-0.20250715164317-6157f960f13b/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.50.1-0.20250717214050-28ae23500a33 h1:k1ZxwUjeMyWmFH2X8373RQymt6zvjnvWyV6AqiSAnrY=
go.temporal.io/api v1.50.1-0.20250717214050-28ae23500a33/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
93 changes: 79 additions & 14 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4505,6 +4505,8 @@ func (wh *WorkflowHandler) StartBatchOperation(
var resetParams batcher.ResetParams
var updateOptionsParams batcher.UpdateOptionsParams
var unpauseActivitiesParams batcher.UnpauseActivitiesParams
var resetActivitiesParams batcher.ResetActivitiesParams
var updateOptionsActivitiesParams batcher.UpdateOptionsActivitiesParams
switch op := request.Operation.(type) {
case *workflowservice.StartBatchOperationRequest_TerminationOperation:
identity = op.TerminationOperation.GetIdentity()
Expand Down Expand Up @@ -4583,7 +4585,7 @@ func (wh *WorkflowHandler) StartBatchOperation(
visibilityQuery = fmt.Sprintf("(%s) AND (%s)", visibilityQuery, unpauseCause)
unpauseActivitiesParams.ActivityType = a.Type
case *batchpb.BatchOperationUnpauseActivities_MatchAll:
if a.MatchAll == false {
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
wildCardUnpause := fmt.Sprintf("%s STARTS_WITH 'property:activityType='", searchattribute.TemporalPauseInfo)
Expand All @@ -4594,24 +4596,81 @@ func (wh *WorkflowHandler) StartBatchOperation(
unpauseActivitiesParams.ResetAttempts = op.UnpauseActivitiesOperation.ResetAttempts
unpauseActivitiesParams.ResetHeartbeat = op.UnpauseActivitiesOperation.ResetHeartbeat
unpauseActivitiesParams.Jitter = op.UnpauseActivitiesOperation.Jitter.AsDuration()

case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation:
operationType = batcher.BatchTypeResetActivities
if op.ResetActivitiesOperation == nil {
return nil, serviceerror.NewInvalidArgument("reset activities operation is not set")
}
if op.ResetActivitiesOperation.GetActivity() == nil {
return nil, serviceerror.NewInvalidArgument("activity filter must be set")
}

switch a := op.ResetActivitiesOperation.GetActivity().(type) {
case *batchpb.BatchOperationResetActivities_Type:
if len(a.Type) == 0 {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
resetActivitiesParams.ActivityType = a.Type
case *batchpb.BatchOperationResetActivities_MatchAll:
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
resetActivitiesParams.MatchAll = true
}

resetActivitiesParams.ResetAttempts = op.ResetActivitiesOperation.ResetAttempts
resetActivitiesParams.ResetHeartbeat = op.ResetActivitiesOperation.ResetHeartbeat
resetActivitiesParams.Jitter = op.ResetActivitiesOperation.Jitter.AsDuration()
resetActivitiesParams.KeepPaused = op.ResetActivitiesOperation.KeepPaused
resetActivitiesParams.RestoreOriginalOptions = op.ResetActivitiesOperation.RestoreOriginalOptions

case *workflowservice.StartBatchOperationRequest_UpdateOptionsActivitiesOperation:
operationType = batcher.BatchTypeUpdateOptionsActivities
if op.UpdateOptionsActivitiesOperation == nil {
return nil, serviceerror.NewInvalidArgument("update activity options operation is not set")
}
if op.UpdateOptionsActivitiesOperation.GetActivity() == nil {
return nil, serviceerror.NewInvalidArgument("activity filter must be set")
}

switch a := op.UpdateOptionsActivitiesOperation.GetActivity().(type) {
case *batchpb.BatchOperationUpdateActivityOptions_Type:
if len(a.Type) == 0 {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
updateOptionsActivitiesParams.ActivityType = a.Type
case *batchpb.BatchOperationUpdateActivityOptions_MatchAll:
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
updateOptionsActivitiesParams.MatchAll = true
}
updateOptionsActivitiesParams.ActivityOptions = op.UpdateOptionsActivitiesOperation.GetActivityOptions()
updateOptionsActivitiesParams.UpdateMask = op.UpdateOptionsActivitiesOperation.GetUpdateMask()
updateOptionsActivitiesParams.RestoreOriginal = op.UpdateOptionsActivitiesOperation.GetRestoreOriginal()
updateOptionsActivitiesParams.Identity = op.UpdateOptionsActivitiesOperation.GetIdentity()

default:
return nil, serviceerror.NewInvalidArgumentf("The operation type %T is not supported", op)
}

input := &batcher.BatchParams{
Namespace: request.GetNamespace(),
Query: visibilityQuery,
Executions: request.GetExecutions(),
Reason: request.GetReason(),
BatchType: operationType,
RPS: float64(request.GetMaxOperationsPerSecond()),
TerminateParams: batcher.TerminateParams{},
CancelParams: batcher.CancelParams{},
SignalParams: signalParams,
DeleteParams: batcher.DeleteParams{},
ResetParams: resetParams,
UpdateOptionsParams: updateOptionsParams,
UnpauseActivitiesParams: unpauseActivitiesParams,
Namespace: request.GetNamespace(),
Query: visibilityQuery,
Executions: request.GetExecutions(),
Reason: request.GetReason(),
BatchType: operationType,
RPS: float64(request.GetMaxOperationsPerSecond()),
TerminateParams: batcher.TerminateParams{},
CancelParams: batcher.CancelParams{},
SignalParams: signalParams,
DeleteParams: batcher.DeleteParams{},
ResetParams: resetParams,
UpdateOptionsParams: updateOptionsParams,
UnpauseActivitiesParams: unpauseActivitiesParams,
ResetActivitiesParams: resetActivitiesParams,
UpdateOptionsActivitiesParams: updateOptionsActivitiesParams,
}
inputPayload, err := sdk.PreferProtoDataConverter.ToPayloads(input)
if err != nil {
Expand Down Expand Up @@ -4777,6 +4836,12 @@ func (wh *WorkflowHandler) DescribeBatchOperation(
operationType = enumspb.BATCH_OPERATION_TYPE_RESET
case batcher.BatchTypeUpdateOptions:
operationType = enumspb.BATCH_OPERATION_TYPE_UPDATE_EXECUTION_OPTIONS
case batcher.BatchTypeUpdateOptionsActivities:
operationType = enumspb.BATCH_OPERATION_TYPE_UPDATE_ACTIVITY_OPTIONS
case batcher.BatchTypeResetActivities:
operationType = enumspb.BATCH_OPERATION_TYPE_RESET_ACTIVITY
case batcher.BatchTypeUnpauseActivities:
operationType = enumspb.BATCH_OPERATION_TYPE_UNPAUSE_ACTIVITY
default:
operationType = enumspb.BATCH_OPERATION_TYPE_UNSPECIFIED
wh.throttledLogger.Warn("Unknown batch operation type", tag.NewStringTag("batch-operation-type", operationTypeString))
Expand Down
54 changes: 54 additions & 0 deletions service/worker/batcher/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func startTaskProcessor(
WorkflowId: workflowID,
RunId: runID,
},
// QUESTION: do we want to plumb through the identity from cli/ui/sdk ? @yuri/@chetan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, we should be consistent across all batch jobs with our use of identity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added for these two, the rest of the options do not have an identity in the proto definition. I will add that in a later PR

Identity: "batch unpause",
Activity: &workflowservice.UnpauseActivityRequest_Type{Type: batchParams.UnpauseActivitiesParams.ActivityType},
ResetAttempts: !batchParams.UnpauseActivitiesParams.ResetAttempts,
Expand Down Expand Up @@ -370,6 +371,59 @@ func startTaskProcessor(
})
return err
})

case BatchTypeResetActivities:
err = processTask(ctx, limiter, task,
func(workflowID, runID string) error {
resetRequest := &workflowservice.ResetActivityRequest{
Namespace: batchParams.Namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
Identity: "batch reset",
Activity: &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ActivityType},
ResetHeartbeat: batchParams.ResetActivitiesParams.ResetHeartbeat,
Jitter: durationpb.New(batchParams.ResetActivitiesParams.Jitter),
KeepPaused: batchParams.ResetActivitiesParams.KeepPaused,
RestoreOriginalOptions: batchParams.ResetActivitiesParams.RestoreOriginalOptions,
}

if batchParams.ResetActivitiesParams.MatchAll {
resetRequest.Activity = &workflowservice.ResetActivityRequest_ResetAll{ResetAll: true}
} else {
resetRequest.Activity = &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ActivityType}
}

_, err = frontendClient.ResetActivity(ctx, resetRequest)
return err
})
case BatchTypeUpdateOptionsActivities:
err = processTask(ctx, limiter, task,
func(workflowID, runID string) error {
updateRequest := &workflowservice.UpdateActivityOptionsRequest{
Namespace: batchParams.Namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
Activity: &workflowservice.UpdateActivityOptionsRequest_Type{Type: batchParams.UpdateOptionsActivitiesParams.ActivityType},
UpdateMask: batchParams.UpdateOptionsActivitiesParams.UpdateMask,
RestoreOriginal: batchParams.UpdateOptionsActivitiesParams.RestoreOriginal,
Identity: batchParams.UpdateOptionsActivitiesParams.Identity,
ActivityOptions: batchParams.UpdateOptionsActivitiesParams.ActivityOptions,
}

if batchParams.UpdateOptionsActivitiesParams.MatchAll {
updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_UpdateAll{UpdateAll: true}
} else {
updateRequest.Activity = &workflowservice.UpdateActivityOptionsRequest_Type{Type: batchParams.UpdateOptionsActivitiesParams.ActivityType}
}

_, err = frontendClient.UpdateActivityOptions(ctx, updateRequest)
return err
})
// QUESTION: why do we not have a default case and return an error? @yuri/@chetan
}
if err != nil {
metrics.BatcherProcessorFailures.With(metricsHandler).Record(1)
Expand Down
40 changes: 40 additions & 0 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package batcher

import (
"errors"
"fmt"
"time"

activitypb "go.temporal.io/api/activity/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
workflowpb "go.temporal.io/api/workflow/v1"
Expand Down Expand Up @@ -42,6 +44,10 @@ const (
BatchTypeUpdateOptions = "update_options"
// BatchTypePauseActivities is batch type for unpausing activities
BatchTypeUnpauseActivities = "unpause_activities"
// BatchTypeUpdateOptionsActivities is batch type for updating the options of activities
BatchTypeUpdateOptionsActivities = "update_options_activities"
// BatchTypeResetActivities is batch type for resetting activities
BatchTypeResetActivities = "reset_activities"
)

var (
Expand Down Expand Up @@ -103,6 +109,26 @@ type (
Jitter time.Duration
}

UpdateOptionsActivitiesParams struct {
Identity string
ActivityType string
MatchAll bool
ActivityOptions *activitypb.ActivityOptions
UpdateMask *fieldmaskpb.FieldMask
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine but you should know that using a nested proto struct in workflow input doesn't use proto JSON serialization, instead it uses plain JSON serialization which may yield unexpected results.

If we were writing this from scratch, the right thing to do here is to define a proto struct for the workflow input and reuse the request structs for each of these actions.
You may want to serialize these proto messages yourself and bypass the SDK's serialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted w/ Yuri on this. We already use the proto struct for FieldMask without custom serialization in other batch commands so we should be fine here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would test that every field in these message are properly serialized.
Generally, what we've done here is bad practice and should be avoided in our codebase. It's not just FieldMask, it's also ActivityOptions. Even if we know that all fields are properly serialized today (which I'm not convinced of), there's no guarantee that future fields will.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: There are known issues with the default protobuf json converter when the message contains oneoff fields - https://protobuf.dev/programming-guides/json/. It's recommended to use protojson to explicitly serialize/deserialize in such cases.
In this case you're fine since UpdateActivitiesOptionsParams doesn't contain any protobufs with oneoff fields. But that may change in the future if someone adds additional fields to this input.

RestoreOriginal bool
}

ResetActivitiesParams struct {
Identity string
ActivityType string
MatchAll bool
ResetAttempts bool
ResetHeartbeat bool
KeepPaused bool
Jitter time.Duration
RestoreOriginalOptions bool
}

// BatchParams is the parameters for batch operation workflow
BatchParams struct {
// Target namespace to execute batch operation
Expand Down Expand Up @@ -131,6 +157,10 @@ type (
UpdateOptionsParams UpdateOptionsParams
// UnpauseActivitiesParams is params only for BatchTypeUnpauseActivities
UnpauseActivitiesParams UnpauseActivitiesParams
// UpdateOptionsActivitiesParams is params only for BatchTypeUpdateOptionsActivities
UpdateOptionsActivitiesParams UpdateOptionsActivitiesParams
// ResetActivitiesParams is params only for BatchTypeResetActivities
ResetActivitiesParams ResetActivitiesParams

// RPS sets the requests-per-second limit for the batch.
// The default (and max) is defined by `worker.BatcherRPS` in the dynamic config.
Expand Down Expand Up @@ -257,6 +287,16 @@ func validateParams(params BatchParams) error {
return fmt.Errorf("must provide ActivityType or MatchAll")
}
return nil
case BatchTypeResetActivities:
if params.ResetActivitiesParams.ActivityType == "" && !params.ResetActivitiesParams.MatchAll {
return errors.New("must provide ActivityType or MatchAll")
}
return nil
case BatchTypeUpdateOptionsActivities:
if params.UpdateOptionsActivitiesParams.ActivityType == "" && !params.UpdateOptionsActivitiesParams.MatchAll {
return errors.New("must provide ActivityType or MatchAll")
}
return nil
default:
return fmt.Errorf("not supported batch type: %v", params.BatchType)
}
Expand Down
Loading
Loading