Skip to content

Commit 69bd781

Browse files
committed
Revert PR 8081 changes in service/frontend directory only
This reverts the batch operation changes from PR 8081 (#8081) but only for files in the service/frontend directory, restoring the original BatchParams-based implementation.
1 parent e0bfbcd commit 69bd781

File tree

2 files changed

+297
-177
lines changed

2 files changed

+297
-177
lines changed

service/frontend/workflow_handler.go

Lines changed: 200 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"time"
1414

1515
"github.com/pborman/uuid"
16-
"github.com/temporalio/sqlparser"
1716
batchpb "go.temporal.io/api/batch/v1"
1817
commonpb "go.temporal.io/api/common/v1"
1918
enumspb "go.temporal.io/api/enums/v1"
@@ -27,7 +26,6 @@ import (
2726
workerpb "go.temporal.io/api/worker/v1"
2827
workflowpb "go.temporal.io/api/workflow/v1"
2928
"go.temporal.io/api/workflowservice/v1"
30-
batchspb "go.temporal.io/server/api/batch/v1"
3129
deploymentspb "go.temporal.io/server/api/deployment/v1"
3230
"go.temporal.io/server/api/historyservice/v1"
3331
"go.temporal.io/server/api/matchingservice/v1"
@@ -59,7 +57,6 @@ import (
5957
"go.temporal.io/server/common/persistence/visibility/manager"
6058
"go.temporal.io/server/common/primitives"
6159
"go.temporal.io/server/common/primitives/timestamp"
62-
"go.temporal.io/server/common/priorities"
6360
"go.temporal.io/server/common/retrypolicy"
6461
"go.temporal.io/server/common/rpc"
6562
"go.temporal.io/server/common/rpc/interceptor"
@@ -488,10 +485,6 @@ func (wh *WorkflowHandler) prepareStartWorkflowRequest(
488485
request.SearchAttributes = sa
489486
}
490487

491-
if err := priorities.Validate(request.Priority); err != nil {
492-
return nil, err
493-
}
494-
495488
if err := wh.validateWorkflowCompletionCallbacks(namespaceName, request.GetCompletionCallbacks()); err != nil {
496489
return nil, err
497490
}
@@ -2070,10 +2063,6 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
20702063
request.SearchAttributes = sa
20712064
}
20722065

2073-
if err := priorities.Validate(request.Priority); err != nil {
2074-
return nil, err
2075-
}
2076-
20772066
if err := wh.validateLinks(namespaceName, request.GetLinks()); err != nil {
20782067
return nil, err
20792068
}
@@ -4473,8 +4462,26 @@ func (wh *WorkflowHandler) StartBatchOperation(
44734462
return nil, errRequestNotSet
44744463
}
44754464

4476-
if err := batcher.ValidateBatchOperation(request); err != nil {
4477-
return nil, err
4465+
if len(request.GetJobId()) == 0 {
4466+
return nil, errBatchJobIDNotSet
4467+
}
4468+
if len(request.Namespace) == 0 {
4469+
return nil, errNamespaceNotSet
4470+
}
4471+
if len(request.VisibilityQuery) == 0 && len(request.Executions) == 0 {
4472+
return nil, errBatchOpsWorkflowFilterNotSet
4473+
}
4474+
if len(request.VisibilityQuery) != 0 && len(request.Executions) != 0 {
4475+
return nil, errBatchOpsWorkflowFiltersNotAllowed
4476+
}
4477+
if len(request.Executions) > wh.config.MaxExecutionCountBatchOperation(request.Namespace) {
4478+
return nil, errBatchOpsMaxWorkflowExecutionCount
4479+
}
4480+
if len(request.Reason) == 0 {
4481+
return nil, errReasonNotSet
4482+
}
4483+
if request.Operation == nil {
4484+
return nil, errBatchOperationNotSet
44784485
}
44794486

44804487
if !wh.config.EnableBatcher(request.Namespace) {
@@ -4506,67 +4513,216 @@ func (wh *WorkflowHandler) StartBatchOperation(
45064513
if err != nil {
45074514
return nil, err
45084515
}
4509-
4510-
input := &batchspb.BatchOperationInput{
4511-
Request: request,
4512-
NamespaceId: namespaceID.String(),
4513-
}
4514-
45154516
var identity string
4517+
var operationType string
4518+
var signalParams batcher.SignalParams
4519+
var resetParams batcher.ResetParams
4520+
var updateOptionsParams batcher.UpdateOptionsParams
4521+
var unpauseActivitiesParams batcher.UnpauseActivitiesParams
4522+
var resetActivitiesParams batcher.ResetActivitiesParams
4523+
var updateActivitiesOptionsParams batcher.UpdateActivitiesOptionsParams
45164524
switch op := request.Operation.(type) {
45174525
case *workflowservice.StartBatchOperationRequest_TerminationOperation:
4518-
input.BatchType = enumspb.BATCH_OPERATION_TYPE_TERMINATE
45194526
identity = op.TerminationOperation.GetIdentity()
4527+
operationType = batcher.BatchTypeTerminate
45204528
case *workflowservice.StartBatchOperationRequest_SignalOperation:
4521-
input.BatchType = enumspb.BATCH_OPERATION_TYPE_SIGNAL
45224529
identity = op.SignalOperation.GetIdentity()
4530+
operationType = batcher.BatchTypeSignal
4531+
signalParams.SignalName = op.SignalOperation.GetSignal()
4532+
signalParams.Input = op.SignalOperation.GetInput()
45234533
case *workflowservice.StartBatchOperationRequest_CancellationOperation:
4524-
input.BatchType = enumspb.BATCH_OPERATION_TYPE_CANCEL
45254534
identity = op.CancellationOperation.GetIdentity()
4535+
operationType = batcher.BatchTypeCancel
45264536
case *workflowservice.StartBatchOperationRequest_DeletionOperation:
4527-
input.BatchType = enumspb.BATCH_OPERATION_TYPE_DELETE
45284537
identity = op.DeletionOperation.GetIdentity()
4538+
operationType = batcher.BatchTypeDelete
45294539
case *workflowservice.StartBatchOperationRequest_ResetOperation:
4530-
input.BatchType = enumspb.BATCH_OPERATION_TYPE_RESET
45314540
identity = op.ResetOperation.GetIdentity()
4541+
operationType = batcher.BatchTypeReset
4542+
if op.ResetOperation.Options != nil {
4543+
if op.ResetOperation.Options.Target == nil {
4544+
return nil, serviceerror.NewInvalidArgument("batch reset missing target")
4545+
}
4546+
encodedResetOptions, err := op.ResetOperation.Options.Marshal()
4547+
if err != nil {
4548+
return nil, err
4549+
}
4550+
resetParams.ResetOptions = encodedResetOptions
4551+
resetParams.PostResetOperations = make([][]byte, len(op.ResetOperation.PostResetOperations))
4552+
for i, postResetOperation := range op.ResetOperation.PostResetOperations {
4553+
encodedPostResetOperations, err := postResetOperation.Marshal()
4554+
if err != nil {
4555+
return nil, err
4556+
}
4557+
resetParams.PostResetOperations[i] = encodedPostResetOperations
4558+
}
4559+
} else {
4560+
// TODO: remove support for old fields later
4561+
resetType := op.ResetOperation.GetResetType()
4562+
if _, ok := enumspb.ResetType_name[int32(resetType)]; !ok || resetType == enumspb.RESET_TYPE_UNSPECIFIED {
4563+
return nil, serviceerror.NewInvalidArgumentf("unknown batch reset type %v", resetType)
4564+
}
4565+
resetParams.ResetType = resetType
4566+
resetParams.ResetReapplyType = op.ResetOperation.GetResetReapplyType()
4567+
}
45324568
case *workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation:
4533-
input.BatchType = enumspb.BATCH_OPERATION_TYPE_UPDATE_EXECUTION_OPTIONS
45344569
identity = op.UpdateWorkflowOptionsOperation.GetIdentity()
4570+
operationType = batcher.BatchTypeUpdateOptions
4571+
updateOptionsParams.WorkflowExecutionOptions = op.UpdateWorkflowOptionsOperation.GetWorkflowExecutionOptions()
4572+
if updateMask := op.UpdateWorkflowOptionsOperation.GetUpdateMask(); updateMask != nil {
4573+
updateOptionsParams.UpdateMask = &batcher.FieldMask{Paths: updateMask.Paths}
4574+
}
4575+
// TODO(carlydf): remove hacky usage of deprecated fields later, after adding support for oneof in BatchParams encoder
4576+
if o := updateOptionsParams.WorkflowExecutionOptions.VersioningOverride; o.GetOverride() != nil {
4577+
deprecatedOverride := &workflowpb.VersioningOverride{}
4578+
if o.GetAutoUpgrade() {
4579+
deprecatedOverride.Behavior = enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE //nolint:staticcheck // SA1019: worker versioning v0.31
4580+
} else if o.GetPinned().GetBehavior() == workflowpb.VersioningOverride_PINNED_OVERRIDE_BEHAVIOR_PINNED {
4581+
deprecatedOverride.Behavior = enumspb.VERSIONING_BEHAVIOR_PINNED //nolint:staticcheck // SA1019: worker versioning v0.31
4582+
deprecatedOverride.PinnedVersion = worker_versioning.ExternalWorkerDeploymentVersionToStringV31(o.GetPinned().GetVersion()) //nolint:staticcheck // SA1019: worker versioning v0.31
4583+
}
4584+
updateOptionsParams.WorkflowExecutionOptions.VersioningOverride = deprecatedOverride
4585+
}
45354586
case *workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation:
4536-
input.BatchType = enumspb.BATCH_OPERATION_TYPE_UNPAUSE_ACTIVITY
4537-
identity = op.UnpauseActivitiesOperation.GetIdentity()
4587+
operationType = batcher.BatchTypeUnpauseActivities
4588+
if op.UnpauseActivitiesOperation == nil {
4589+
return nil, serviceerror.NewInvalidArgument("unpause activities operation is not set")
4590+
}
4591+
if op.UnpauseActivitiesOperation.GetActivity() == nil {
4592+
return nil, serviceerror.NewInvalidArgument("activity filter must be set")
4593+
}
45384594

45394595
switch a := op.UnpauseActivitiesOperation.GetActivity().(type) {
45404596
case *batchpb.BatchOperationUnpauseActivities_Type:
4541-
searchValue := fmt.Sprintf("property:activityType=%s", a.Type)
4542-
escapedSearchValue := sqlparser.String(sqlparser.NewStrVal([]byte(searchValue)))
4543-
input.Request.VisibilityQuery = fmt.Sprintf("%s = %s", searchattribute.TemporalPauseInfo, escapedSearchValue)
4597+
if len(a.Type) == 0 {
4598+
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
4599+
}
4600+
unpauseCause := fmt.Sprintf("%s = 'property:activityType=%s'", searchattribute.TemporalPauseInfo, a.Type)
4601+
visibilityQuery = fmt.Sprintf("(%s) AND (%s)", visibilityQuery, unpauseCause)
4602+
unpauseActivitiesParams.ActivityType = a.Type
45444603
case *batchpb.BatchOperationUnpauseActivities_MatchAll:
4604+
if !a.MatchAll {
4605+
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
4606+
}
45454607
wildCardUnpause := fmt.Sprintf("%s STARTS_WITH 'property:activityType='", searchattribute.TemporalPauseInfo)
4546-
input.Request.VisibilityQuery = fmt.Sprintf("(%s) AND (%s)", visibilityQuery, wildCardUnpause)
4608+
visibilityQuery = fmt.Sprintf("(%s) AND (%s)", visibilityQuery, wildCardUnpause)
4609+
unpauseActivitiesParams.MatchAll = true
45474610
}
4611+
4612+
unpauseActivitiesParams.ResetAttempts = op.UnpauseActivitiesOperation.ResetAttempts
4613+
unpauseActivitiesParams.ResetHeartbeat = op.UnpauseActivitiesOperation.ResetHeartbeat
4614+
unpauseActivitiesParams.Jitter = op.UnpauseActivitiesOperation.Jitter.AsDuration()
4615+
unpauseActivitiesParams.Identity = op.UnpauseActivitiesOperation.GetIdentity()
45484616
case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation:
4549-
input.BatchType = enumspb.BATCH_OPERATION_TYPE_RESET_ACTIVITY
4550-
identity = op.ResetActivitiesOperation.GetIdentity()
4617+
operationType = batcher.BatchTypeResetActivities
4618+
if op.ResetActivitiesOperation == nil {
4619+
return nil, serviceerror.NewInvalidArgument("reset activities operation is not set")
4620+
}
4621+
if op.ResetActivitiesOperation.GetActivity() == nil {
4622+
return nil, serviceerror.NewInvalidArgument("activity filter must be set")
4623+
}
4624+
4625+
switch a := op.ResetActivitiesOperation.GetActivity().(type) {
4626+
case *batchpb.BatchOperationResetActivities_Type:
4627+
if len(a.Type) == 0 {
4628+
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
4629+
}
4630+
resetActivitiesParams.ActivityType = a.Type
4631+
case *batchpb.BatchOperationResetActivities_MatchAll:
4632+
if !a.MatchAll {
4633+
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
4634+
}
4635+
resetActivitiesParams.MatchAll = true
4636+
}
4637+
4638+
resetActivitiesParams.ResetAttempts = op.ResetActivitiesOperation.ResetAttempts
4639+
resetActivitiesParams.ResetHeartbeat = op.ResetActivitiesOperation.ResetHeartbeat
4640+
resetActivitiesParams.Jitter = op.ResetActivitiesOperation.Jitter.AsDuration()
4641+
resetActivitiesParams.KeepPaused = op.ResetActivitiesOperation.KeepPaused
4642+
resetActivitiesParams.RestoreOriginalOptions = op.ResetActivitiesOperation.RestoreOriginalOptions
4643+
resetActivitiesParams.Identity = op.ResetActivitiesOperation.GetIdentity()
45514644
case *workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation:
4552-
input.BatchType = enumspb.BATCH_OPERATION_TYPE_UPDATE_ACTIVITY_OPTIONS
4553-
identity = op.UpdateActivityOptionsOperation.GetIdentity()
4645+
operationType = batcher.BatchTypeUpdateActivitiesOptions
4646+
if op.UpdateActivityOptionsOperation == nil {
4647+
return nil, serviceerror.NewInvalidArgument("update activity options operation is not set")
4648+
}
4649+
if op.UpdateActivityOptionsOperation.GetActivityOptions() != nil && op.UpdateActivityOptionsOperation.GetRestoreOriginal() {
4650+
return nil, serviceerror.NewInvalidArgument("cannot set both activity options and restore original")
4651+
}
4652+
if op.UpdateActivityOptionsOperation.GetActivityOptions() == nil && !op.UpdateActivityOptionsOperation.GetRestoreOriginal() {
4653+
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or restore original should be set to true")
4654+
}
4655+
4656+
switch a := op.UpdateActivityOptionsOperation.GetActivity().(type) {
4657+
case *batchpb.BatchOperationUpdateActivityOptions_Type:
4658+
if len(a.Type) == 0 {
4659+
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
4660+
}
4661+
updateActivitiesOptionsParams.ActivityType = a.Type
4662+
case *batchpb.BatchOperationUpdateActivityOptions_MatchAll:
4663+
if !a.MatchAll {
4664+
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
4665+
}
4666+
updateActivitiesOptionsParams.MatchAll = true
4667+
}
4668+
4669+
updateActivitiesOptionsParams.RestoreOriginal = op.UpdateActivityOptionsOperation.GetRestoreOriginal()
4670+
updateActivitiesOptionsParams.Identity = op.UpdateActivityOptionsOperation.GetIdentity()
4671+
if updateMask := op.UpdateActivityOptionsOperation.GetUpdateMask(); updateMask != nil {
4672+
updateActivitiesOptionsParams.UpdateMask = &batcher.FieldMask{Paths: updateMask.Paths}
4673+
}
4674+
if ao := op.UpdateActivityOptionsOperation.GetActivityOptions(); ao != nil {
4675+
updateActivitiesOptionsParams.ActivityOptions = &batcher.ActivityOptions{
4676+
ScheduleToStartTimeout: ao.ScheduleToStartTimeout.AsDuration(),
4677+
ScheduleToCloseTime: ao.ScheduleToCloseTimeout.AsDuration(),
4678+
StartToCloseTimeout: ao.StartToCloseTimeout.AsDuration(),
4679+
HeartbeatTimeout: ao.HeartbeatTimeout.AsDuration(),
4680+
}
4681+
if rp := ao.RetryPolicy; rp != nil {
4682+
updateActivitiesOptionsParams.ActivityOptions.RetryPolicy = &batcher.RetryPolicy{
4683+
InitialInterval: rp.InitialInterval.AsDuration(),
4684+
MaximumInterval: rp.MaximumInterval.AsDuration(),
4685+
BackoffCoefficient: rp.BackoffCoefficient,
4686+
NonRetryableErrorTypes: rp.NonRetryableErrorTypes,
4687+
MaximumAttempts: rp.MaximumAttempts,
4688+
}
4689+
}
4690+
if tq := ao.TaskQueue; tq != nil {
4691+
updateActivitiesOptionsParams.ActivityOptions.TaskQueue = &batcher.TaskQueue{
4692+
Name: tq.Name,
4693+
Kind: int32(tq.Kind),
4694+
}
4695+
}
4696+
}
45544697
default:
45554698
return nil, serviceerror.NewInvalidArgumentf("The operation type %T is not supported", op)
45564699
}
45574700

4558-
if err := batcher.ValidateBatchOperation(request); err != nil {
4559-
return nil, err
4701+
input := &batcher.BatchParams{
4702+
Namespace: request.GetNamespace(),
4703+
Query: visibilityQuery,
4704+
Executions: request.GetExecutions(),
4705+
Reason: request.GetReason(),
4706+
BatchType: operationType,
4707+
RPS: float64(request.GetMaxOperationsPerSecond()),
4708+
TerminateParams: batcher.TerminateParams{},
4709+
CancelParams: batcher.CancelParams{},
4710+
SignalParams: signalParams,
4711+
DeleteParams: batcher.DeleteParams{},
4712+
ResetParams: resetParams,
4713+
UpdateOptionsParams: updateOptionsParams,
4714+
UnpauseActivitiesParams: unpauseActivitiesParams,
4715+
ResetActivitiesParams: resetActivitiesParams,
4716+
UpdateActivitiesOptionsParams: updateActivitiesOptionsParams,
45604717
}
4561-
4562-
inputPayload, err := payloads.Encode(input)
4718+
inputPayload, err := sdk.PreferProtoDataConverter.ToPayloads(input)
45634719
if err != nil {
45644720
return nil, err
45654721
}
45664722

45674723
memo := &commonpb.Memo{
45684724
Fields: map[string]*commonpb.Payload{
4569-
batcher.BatchOperationTypeMemo: payload.EncodeString(snakeCaseBatchType(input.BatchType)),
4725+
batcher.BatchOperationTypeMemo: payload.EncodeString(operationType),
45704726
batcher.BatchReasonMemo: payload.EncodeString(request.GetReason()),
45714727
},
45724728
}
@@ -4579,7 +4735,7 @@ func (wh *WorkflowHandler) StartBatchOperation(
45794735
startReq := &workflowservice.StartWorkflowExecutionRequest{
45804736
Namespace: request.Namespace,
45814737
WorkflowId: request.GetJobId(),
4582-
WorkflowType: &commonpb.WorkflowType{Name: batcher.BatchWFTypeProtobufName},
4738+
WorkflowType: &commonpb.WorkflowType{Name: batcher.BatchWFTypeName},
45834739
TaskQueue: &taskqueuepb.TaskQueue{Name: primitives.PerNSWorkerTaskQueue},
45844740
Input: inputPayload,
45854741
Identity: identity,
@@ -4607,23 +4763,6 @@ func (wh *WorkflowHandler) StartBatchOperation(
46074763
return &workflowservice.StartBatchOperationResponse{}, nil
46084764
}
46094765

4610-
func snakeCaseBatchType(batchType enumspb.BatchOperationType) string {
4611-
switch batchType {
4612-
case enumspb.BATCH_OPERATION_TYPE_TERMINATE, enumspb.BATCH_OPERATION_TYPE_CANCEL, enumspb.BATCH_OPERATION_TYPE_SIGNAL, enumspb.BATCH_OPERATION_TYPE_DELETE, enumspb.BATCH_OPERATION_TYPE_RESET:
4613-
return strings.ToLower(batchType.String())
4614-
case enumspb.BATCH_OPERATION_TYPE_UPDATE_EXECUTION_OPTIONS:
4615-
return "update_options"
4616-
case enumspb.BATCH_OPERATION_TYPE_UNPAUSE_ACTIVITY:
4617-
return "unpause_activities"
4618-
case enumspb.BATCH_OPERATION_TYPE_UPDATE_ACTIVITY_OPTIONS:
4619-
return "update_activity_options"
4620-
case enumspb.BATCH_OPERATION_TYPE_RESET_ACTIVITY:
4621-
return "reset_activities"
4622-
default:
4623-
return ""
4624-
}
4625-
}
4626-
46274766
func (wh *WorkflowHandler) StopBatchOperation(
46284767
ctx context.Context,
46294768
request *workflowservice.StopBatchOperationRequest,
@@ -4824,7 +4963,9 @@ func (wh *WorkflowHandler) ListBatchOperations(
48244963
Namespace: request.GetNamespace(),
48254964
PageSize: request.PageSize,
48264965
NextPageToken: request.GetNextPageToken(),
4827-
Query: fmt.Sprintf("%s = '%s'",
4966+
Query: fmt.Sprintf("%s = '%s' and %s = '%s'",
4967+
searchattribute.WorkflowType,
4968+
batcher.BatchWFTypeName,
48284969
searchattribute.TemporalNamespaceDivision,
48294970
batcher.NamespaceDivision,
48304971
),

0 commit comments

Comments
 (0)