Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
91ec21f
improvement: bypass middle-man-struct for batch operations
spkane31 Jul 22, 2025
9448f39
migrated to using a protobuf for batch operations, need to update tes…
spkane31 Jul 22, 2025
52a30ca
removing batchparams from last tests
spkane31 Jul 22, 2025
0aae0c8
merge conflicts
spkane31 Jul 22, 2025
44faa03
linter checks
spkane31 Jul 22, 2025
c0d3a88
another merge conflict :(
spkane31 Jul 22, 2025
9bb3411
updating protos
spkane31 Jul 22, 2025
c2ce9ed
hopefully fixing api issues
spkane31 Jul 22, 2025
0698df7
fixed bug setting operation
spkane31 Jul 22, 2025
f45ac22
linting skip and reduce function complexity of BatchActivity
spkane31 Jul 22, 2025
5fd36d1
nolint
spkane31 Jul 22, 2025
d700916
go.sum issue
spkane31 Jul 22, 2025
ce31cd3
fixing two unit tests
spkane31 Jul 22, 2025
7c5b351
patching identity through correctly
spkane31 Jul 22, 2025
c1f3bc2
use payloads.Encode instead of sdk.ToPayloads
spkane31 Jul 23, 2025
5f4486e
slices are set to nil by payloads.Encode, unlike with sdk.ToPayloads
spkane31 Jul 23, 2025
9dee275
fixing serialization for protobuf
spkane31 Jul 23, 2025
80a5f9c
self review
spkane31 Jul 23, 2025
459ad84
need backwards compatability
spkane31 Jul 23, 2025
5b29d78
workflow test duplication
spkane31 Jul 23, 2025
9d4dfff
duplicating workflow registration
spkane31 Jul 23, 2025
0298808
small diff fixes
spkane31 Jul 23, 2025
1288ab4
got a batchworkflowexecutionrequest type but cannot find the method
spkane31 Jul 23, 2025
10ac7a8
undoing changes to clients, adding internal proto definition for batc…
spkane31 Jul 24, 2025
a9c9287
removing use of api changes
spkane31 Jul 24, 2025
1faefe9
linting errors
spkane31 Jul 24, 2025
cb1f201
Merge branch 'main' of github.com:temporalio/temporal into spk/proto-…
spkane31 Jul 24, 2025
95f9812
adding more fields to proto definition
spkane31 Jul 24, 2025
ce0efb4
lint and test fix
spkane31 Jul 24, 2025
beae3c8
fixing batch update options test
spkane31 Jul 24, 2025
2087b89
linting again!
spkane31 Jul 24, 2025
a673564
fixing more tests, removing request response
spkane31 Jul 24, 2025
f50c99c
fixing the batchtype param not being set
spkane31 Jul 24, 2025
fc68bea
removing duplicated tests, simplifying the operation setting
spkane31 Jul 24, 2025
e58c6f4
revert payload.Encodestring change
spkane31 Jul 24, 2025
9a5ea22
revert
spkane31 Jul 24, 2025
0eff6bd
Merge branch 'main' of github.com:temporalio/temporal into spk/proto-…
spkane31 Jul 24, 2025
0bea4dd
reset entirely
spkane31 Jul 25, 2025
b481d5e
fixing these dang tests!
spkane31 Jul 25, 2025
7ea8b07
small change
spkane31 Jul 25, 2025
778b3e6
fixing last test hopefully
spkane31 Jul 25, 2025
0e8272b
Merge branch 'main' of github.com:temporalio/temporal into spk/proto-…
spkane31 Jul 25, 2025
e87fafd
embed the workflowservice into batch operation, share validation logic
spkane31 Jul 25, 2025
1e49046
even simpler, fixing tests
spkane31 Jul 25, 2025
aacbdff
fixing proto panic and setting all required fields in mocks
spkane31 Jul 25, 2025
567852b
linter again
spkane31 Jul 25, 2025
69c9a1c
linter
spkane31 Jul 28, 2025
9068d36
Merge branch 'main' into spk/proto-serialization
spkane31 Jul 28, 2025
9e6a1e2
Merge branch 'main' into spk/proto-serialization
spkane31 Jul 28, 2025
606ac05
Merge branch 'main' into spk/proto-serialization
spkane31 Jul 30, 2025
123d6ba
addressing roeys comments, removing BatchOperationInput, use namespac…
spkane31 Jul 30, 2025
e7d525d
comments
spkane31 Jul 30, 2025
aebc4f9
Merge branch 'spk/proto-serialization' of github.com:temporalio/tempo…
spkane31 Jul 30, 2025
5333cb7
fixing namespace issues, renaming batchoperation -> batchoperationinput
spkane31 Jul 30, 2025
b01bfc2
fixing unit tests
spkane31 Jul 30, 2025
3ce004e
removing duplicated fields from proto definition, sanitizing visibili…
spkane31 Aug 1, 2025
1feb76c
escape entire search value
spkane31 Aug 1, 2025
9c7dfaf
Merge branch 'main' into spk/proto-serialization
spkane31 Aug 1, 2025
508f259
removing comments
spkane31 Aug 1, 2025
03b2812
Merge branch 'main' of github.com:temporalio/temporal into spk/proto-…
spkane31 Aug 1, 2025
44ea522
Merge branch 'spk/proto-serialization' of github.com:temporalio/tempo…
spkane31 Aug 1, 2025
7e3a3f2
fixing test
spkane31 Aug 4, 2025
fa9ad0a
Merge branch 'main' of github.com:temporalio/temporal into spk/proto-…
spkane31 Aug 4, 2025
5fdb323
keep all fields in the proto
spkane31 Aug 4, 2025
f45b1ae
refactorings, bug fixes, removing rps
spkane31 Aug 4, 2025
15a4428
Merge branch 'main' of github.com:temporalio/temporal into spk/proto-…
spkane31 Aug 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4505,7 +4505,7 @@
var resetParams batcher.ResetParams
var updateOptionsParams batcher.UpdateOptionsParams
var unpauseActivitiesParams batcher.UnpauseActivitiesParams
var resetActivitiesParams batcher.ResetActivitiesParams
var resetActivitiesParams *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation
var updateActivitiesOptionsParams batcher.UpdateActivitiesOptionsParams
switch op := request.Operation.(type) {
case *workflowservice.StartBatchOperationRequest_TerminationOperation:
Expand Down Expand Up @@ -4601,7 +4601,7 @@
unpauseActivitiesParams.Identity = op.UnpauseActivitiesOperation.GetIdentity()
case *workflowservice.StartBatchOperationRequest_ResetActivitiesOperation:
operationType = batcher.BatchTypeResetActivities
if op.ResetActivitiesOperation == nil {
if op.ResetActivitiesOperation == nil {

Check failure on line 4604 in service/frontend/workflow_handler.go

View workflow job for this annotation

GitHub Actions / golangci

File is not properly formatted (goimports)
return nil, serviceerror.NewInvalidArgument("reset activities operation is not set")
}
if op.ResetActivitiesOperation.GetActivity() == nil {
Expand All @@ -4613,20 +4613,20 @@
if len(a.Type) == 0 {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
resetActivitiesParams.ActivityType = a.Type
resetActivitiesParams.ResetActivitiesOperation.Activity = a
case *batchpb.BatchOperationResetActivities_MatchAll:
if !a.MatchAll {
return nil, serviceerror.NewInvalidArgument("Either activity type must be set, or match all should be set to true")
}
resetActivitiesParams.MatchAll = true
resetActivitiesParams.ResetActivitiesOperation.Activity = a
}

resetActivitiesParams.ResetAttempts = op.ResetActivitiesOperation.ResetAttempts
resetActivitiesParams.ResetHeartbeat = op.ResetActivitiesOperation.ResetHeartbeat
resetActivitiesParams.Jitter = op.ResetActivitiesOperation.Jitter.AsDuration()
resetActivitiesParams.KeepPaused = op.ResetActivitiesOperation.KeepPaused
resetActivitiesParams.RestoreOriginalOptions = op.ResetActivitiesOperation.RestoreOriginalOptions
resetActivitiesParams.Identity = op.ResetActivitiesOperation.GetIdentity()
resetActivitiesParams.ResetActivitiesOperation.ResetAttempts = op.ResetActivitiesOperation.ResetAttempts
resetActivitiesParams.ResetActivitiesOperation.ResetHeartbeat = op.ResetActivitiesOperation.ResetHeartbeat
resetActivitiesParams.ResetActivitiesOperation.Jitter = op.ResetActivitiesOperation.Jitter
resetActivitiesParams.ResetActivitiesOperation.KeepPaused = op.ResetActivitiesOperation.KeepPaused
resetActivitiesParams.ResetActivitiesOperation.RestoreOriginalOptions = op.ResetActivitiesOperation.RestoreOriginalOptions
resetActivitiesParams.ResetActivitiesOperation.Identity = op.ResetActivitiesOperation.GetIdentity()
case *workflowservice.StartBatchOperationRequest_UpdateActivityOptionsOperation:
operationType = batcher.BatchTypeUpdateActivitiesOptions
if op.UpdateActivityOptionsOperation == nil {
Expand Down
22 changes: 13 additions & 9 deletions service/worker/batcher/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pborman/uuid"
activitypb "go.temporal.io/api/activity/v1"
batchpb "go.temporal.io/api/batch/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -382,18 +383,21 @@ func startTaskProcessor(
WorkflowId: workflowID,
RunId: runID,
},
Identity: batchParams.ResetActivitiesParams.Identity,
Activity: &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ActivityType},
ResetHeartbeat: batchParams.ResetActivitiesParams.ResetHeartbeat,
Jitter: durationpb.New(batchParams.ResetActivitiesParams.Jitter),
KeepPaused: batchParams.ResetActivitiesParams.KeepPaused,
RestoreOriginalOptions: batchParams.ResetActivitiesParams.RestoreOriginalOptions,
Identity: batchParams.ResetActivitiesParams.ResetActivitiesOperation.Identity,
Activity: &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ResetActivitiesOperation.GetType()},
ResetHeartbeat: batchParams.ResetActivitiesParams.ResetActivitiesOperation.ResetHeartbeat,
Jitter: batchParams.ResetActivitiesParams.ResetActivitiesOperation.Jitter,
KeepPaused: batchParams.ResetActivitiesParams.ResetActivitiesOperation.KeepPaused,
RestoreOriginalOptions: batchParams.ResetActivitiesParams.ResetActivitiesOperation.RestoreOriginalOptions,
}

if batchParams.ResetActivitiesParams.MatchAll {
switch a := batchParams.ResetActivitiesParams.ResetActivitiesOperation.GetActivity().(type) {
case *batchpb.BatchOperationResetActivities_Type:
resetRequest.Activity = &workflowservice.ResetActivityRequest_Type{Type: a.Type}
case *batchpb.BatchOperationResetActivities_MatchAll:
resetRequest.Activity = &workflowservice.ResetActivityRequest_MatchAll{MatchAll: true}
} else {
resetRequest.Activity = &workflowservice.ResetActivityRequest_Type{Type: batchParams.ResetActivitiesParams.ActivityType}
default:
return fmt.Errorf("invalid activity type: %T", a)
}

_, err = frontendClient.ResetActivity(ctx, resetRequest)
Expand Down
30 changes: 16 additions & 14 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
"fmt"
"time"

batchpb "go.temporal.io/api/batch/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
workflowpb "go.temporal.io/api/workflow/v1"
workflowservicepb "go.temporal.io/api/workflowservice/v1"

Check failure on line 12 in service/worker/batcher/workflow.go

View workflow job for this annotation

GitHub Actions / golangci

import "go.temporal.io/api/workflowservice/v1" imported as "workflowservicepb" but must be "" according to config (importas)
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common/searchattribute"
Expand Down Expand Up @@ -139,17 +141,6 @@
NonRetryableErrorTypes []string
}

ResetActivitiesParams struct {
Identity string
ActivityType string
MatchAll bool
ResetAttempts bool
ResetHeartbeat bool
KeepPaused bool
Jitter time.Duration
RestoreOriginalOptions bool
}

FieldMask struct {
Paths []string
}
Expand Down Expand Up @@ -185,7 +176,7 @@
// UpdateActivitiesOptionsParams is params only for BatchTypeUpdateActivitiesOptions
UpdateActivitiesOptionsParams UpdateActivitiesOptionsParams
// ResetActivitiesParams is params only for BatchTypeResetActivities
ResetActivitiesParams ResetActivitiesParams
ResetActivitiesParams *workflowservicepb.StartBatchOperationRequest_ResetActivitiesOperation

// RPS sets the requests-per-second limit for the batch.
// The default (and max) is defined by `worker.BatcherRPS` in the dynamic config.
Expand Down Expand Up @@ -313,8 +304,19 @@
}
return nil
case BatchTypeResetActivities:
if params.ResetActivitiesParams.ActivityType == "" && !params.ResetActivitiesParams.MatchAll {
return errors.New("must provide ActivityType or MatchAll")
if params.ResetActivitiesParams.ResetActivitiesOperation.GetActivity() == nil {
return fmt.Errorf("must provide ActivityType or MatchAll")

Check failure on line 308 in service/worker/batcher/workflow.go

View workflow job for this annotation

GitHub Actions / golangci

use-errors-new: replace fmt.Errorf by errors.New (revive)
}

switch a := params.ResetActivitiesParams.ResetActivitiesOperation.GetActivity().(type) {
case *batchpb.BatchOperationResetActivities_Type:
if len(a.Type) == 0 {
return fmt.Errorf("must provide ActivityType")

Check failure on line 314 in service/worker/batcher/workflow.go

View workflow job for this annotation

GitHub Actions / golangci

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
case *batchpb.BatchOperationResetActivities_MatchAll:
if !a.MatchAll {
return fmt.Errorf("must provide MatchAll")

Check failure on line 318 in service/worker/batcher/workflow.go

View workflow job for this annotation

GitHub Actions / golangci

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
}
return nil
case BatchTypeUpdateActivitiesOptions:
Expand Down
Loading