-
Notifications
You must be signed in to change notification settings - Fork 1.2k
improvement: bypass middle-man-struct for batch operations #8081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dnr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple high-level comments
…ral into spk/proto-serialization
proto/internal/temporal/server/api/batch/v1/request_response.proto
Outdated
Show resolved
Hide resolved
proto/internal/temporal/server/api/batch/v1/request_response.proto
Outdated
Show resolved
Hide resolved
| return &workflowservice.StartBatchOperationResponse{}, nil | ||
| } | ||
|
|
||
| func snakeCaseBatchType(batchType enumspb.BatchOperationType) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To maintain consistency with how we do these now a few of the batch operation types needed special casing
…ral into spk/proto-serialization
bergundy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Please look at the remaining comments before merging.
proto/internal/temporal/server/api/batch/v1/request_response.proto
Outdated
Show resolved
Hide resolved
| Query: fmt.Sprintf("%s = '%s' and %s = '%s'", | ||
| searchattribute.WorkflowType, | ||
| batcher.BatchWFTypeName, | ||
| batcher.BatchWFTypeProtobufName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cause some confusion around the deployment transition. Is it worth listing both types temporarily?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I missed this in the review, in person I requested to keep both. Addressed in #8144. We should merge that change before cutting a new server release. I would consider reverting this one until this fix is in.
| // nolint:revive,cognitive-complexity | ||
| func (a *activities) BatchActivityWithProtobuf(ctx context.Context, batchParams *batchspb.BatchOperationInput) (HeartBeatDetails, error) { | ||
| logger := a.getActivityLogger(ctx) | ||
| hbd := HeartBeatDetails{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth considering moving heartbeat details to proto too? if we don't do it now we never can :)
| rateLimiter := rate.NewLimiter(rateLimit, burstLimit) | ||
| taskCh := make(chan taskDetail, pageSize) | ||
| respCh := make(chan error, pageSize) | ||
| for i := 0; i < a.getOperationConcurrency(int(batchParams.Concurrency)); i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| for i := 0; i < a.getOperationConcurrency(int(batchParams.Concurrency)); i++ { | |
| for range a.getOperationConcurrency(int(batchParams.Concurrency)) { |
| WorkflowExecution: &commonpb.WorkflowExecution{ | ||
| WorkflowId: workflowID, | ||
| RunId: runID, | ||
| WorkflowId: execution.WorkflowId, | ||
| RunId: execution.RunId, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| WorkflowExecution: &commonpb.WorkflowExecution{ | |
| WorkflowId: workflowID, | |
| RunId: runID, | |
| WorkflowId: execution.WorkflowId, | |
| RunId: execution.RunId, | |
| }, | |
| WorkflowExecution: execution, |
| WorkflowExecution: &commonpb.WorkflowExecution{ | ||
| WorkflowId: workflowID, | ||
| RunId: runID, | ||
| WorkflowId: execution.WorkflowId, | ||
| RunId: execution.RunId, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| WorkflowExecution: &commonpb.WorkflowExecution{ | |
| WorkflowId: workflowID, | |
| RunId: runID, | |
| WorkflowId: execution.WorkflowId, | |
| RunId: execution.RunId, | |
| }, | |
| WorkflowExecution: execution, |
and same thing repeated several more times
| WorkflowExecution: &commonpb.WorkflowExecution{ | ||
| WorkflowId: execution.WorkflowId, | ||
| RunId: execution.RunId, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| WorkflowExecution: &commonpb.WorkflowExecution{ | |
| WorkflowId: execution.WorkflowId, | |
| RunId: execution.RunId, | |
| }, | |
| WorkflowExecution: execution, |
and more below
## What changed? Change the backing query to avoid filtering on a specific workflow type since we now have two types for the transition period. ## Why? Fix for a bug introduced in #8081
This reverts the batch operation changes from PR 8081 (#8081) but only for files in the service/frontend directory, restoring the original BatchParams-based implementation.
This reverts the batch operation changes from PR 8081 (#8081) but only for files in the service/frontend directory, restoring the original BatchParams-based implementation.
This reverts the batch operation changes from PR 8081 (#8081) but only for files in the service/frontend directory, restoring the original BatchParams-based implementation. ## What changed? Revert #8081 frontend changes. ## Why? This change is not backwards compatible, in the future we can turn on the protobuf workflow ## How did you test it? - [x] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks Minimal, we're reverting the breaking change w/ this PR --------- Co-authored-by: Roey Berman <[email protected]>
What changed?
Remove
BatchParamsto safely serialize proto -> json -> proto and add aBatchOperationproto definitionWhy?
Remove custom serialization of proto for json safety.
How did you test it?
Potential risks
Could break batching backwards compatibility but tests should catch this