-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add standalone activity completion and failure support #8653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: poll-component
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request adds standalone activity completion and failure handling functionality to the codebase. It refactors existing timeout failure handling and consolidates retry logic into a common failure module.
- Added handling for standalone activity completion and failure via
RespondActivityTaskCompletedandRespondActivityTaskFailedAPIs - Refactored retry checking logic from
service/history/workflow/retry.gointo a sharedcommon/failurepackage - Extended task tokens to include component references for standalone activities
Reviewed Changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/standalone_activity_test.go | Added comprehensive tests for standalone activity completion and failure scenarios |
| service/matching/matching_engine.go | Fixed buildId naming convention and removed unnecessary blank line |
| service/history/workflow/retry_test.go | Updated tests to use the new common failure.IsRetryable function |
| service/history/workflow/retry.go | Removed local isRetryable function in favor of common/failure package |
| service/history/workflow/mutable_state_impl.go | Updated to use failure.IsRetryable from common package |
| service/history/history_engine_test.go | Improved test assertions using Len and Empty helpers |
| service/history/handler.go | Updated task token validation to support both workflow and standalone activities |
| service/history/chasm_engine.go | Fixed reference to use requestRef.NamespaceID instead of nested EntityKey |
| service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go | Added nil component ref parameter to task token creation |
| service/history/api/respondactivitytaskfailed/api.go | Added standalone activity failure handling via component ref |
| service/history/api/respondactivitytaskcompleted/api.go | Added standalone activity completion handling via component ref |
| service/history/api/recordactivitytaskstarted/api.go | Renamed function to follow Go naming convention for ID |
| service/frontend/workflow_handler.go | Added component ref handling for standalone activity completion/failure by ID |
| proto/internal/temporal/server/api/token/v1/message.proto | Added component_ref field to Task message |
| common/tasktoken/token.go | Added componentRef parameter to NewActivityTaskToken |
| common/searchattribute/encode_test.go | Improved test assertions using assert.Empty |
| common/persistence/visibility/store/sql/query_converter_test.go | Simplified string formatting |
| common/persistence/visibility/store/query/interceptors_test.go | Changed assertions to use JSONEq for better comparison |
| common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go | Changed assertion to use JSONEq for JSON comparison |
| common/failure/failure.go | Added IsRetryable function for shared retry logic |
| chasm/lib/activity/statemachine_test.go | Added tests for TransitionCompleted and TransitionFailed |
| chasm/lib/activity/statemachine.go | Refactored transition handling to use failure parameter and added completion/failure logic |
| chasm/lib/activity/activity_tasks.go | Updated to pass failure parameter to TransitionRescheduled |
| chasm/lib/activity/activity.go | Added RecordActivityCompleted and HandleActivityFailed methods with retry logic |
| api/token/v1/message.pb.go | Generated code for component_ref field |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
b1d936f to
5412f7e
Compare
b4e84fe to
3904355
Compare
bergundy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't go over everything but generally this LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 23 out of 23 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| errWorkflowExecutionNotSet = serviceerror.NewInvalidArgument("WorkflowExecution not set on request.") | ||
| errTaskQueueNotSet = serviceerror.NewInvalidArgument("Task queue not set.") | ||
| errWorkflowIDNotSet = serviceerror.NewInvalidArgument("WorkflowId is not set on request.") | ||
| errBusinessIDNotSet = serviceerror.NewInvalidArgument("Business ID is not set on request.") |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message "Business ID is not set on request." is unclear and potentially confusing. Since this is checking for either ComponentRef or WorkflowId (as seen in validateTaskToken), a more descriptive message would be "WorkflowId or ComponentRef must be set on request." to better indicate what is expected.
| errBusinessIDNotSet = serviceerror.NewInvalidArgument("Business ID is not set on request.") | |
| errBusinessIDNotSet = serviceerror.NewInvalidArgument("WorkflowId or ComponentRef must be set on request.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be an internal error IMHO, if it happens it's because the frontend has a bug. But let's not change the semantics here without a broader discussion.
chasm/lib/activity/activity.go
Outdated
| }), | ||
| Outcome: chasm.NewDataField(ctx, &activitypb.ActivityOutcome{}), | ||
| Visibility: chasm.NewComponentField(ctx, visibility), | ||
| LastHeartbeat: chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, if you don't create this component by default, it will reduce load on the DB by not creating the cell for this node. @yycptt correct me if I'm wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah nice optimization. I've made a helper method that will get and create it lazily.
cc: @dandavison
| startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) | ||
| runID := startResp.RunId | ||
|
|
||
| pollTaskResp := s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) | ||
|
|
||
| _, err := s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ | ||
| Namespace: s.Namespace().String(), | ||
| TaskToken: pollTaskResp.TaskToken, | ||
| Result: defaultResult, | ||
| Identity: "new-worker", | ||
| }) | ||
| require.NoError(t, err) | ||
|
|
||
| s.validateCompletion(ctx, t, activityID, runID, "new-worker") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a fan of these test helpers that hide a lot of the logic and make it hard to follow exactly what is happening. Personally, I am on team repeat yourself in tests.
I developed this notion independently and finally took a stronger stance after watching https://www.youtube.com/watch?v=8hQG7QlcLBk.
That being said, I don't think everyone shares this stance, and I don't think it would be fair to block the PR because tests aren't written in this specific style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the point and thanks for sharing the vid. I think there's a balance between the two.
"Limit helpers to very reused logic that doesn't fail often or fails all at once" - I like that philosophy.
Let me run by the team in the next standup than we can lock it down.
863d5bf to
ee79ea9
Compare
cd288dc to
e9a1223
Compare
e9a1223 to
e5d900b
Compare
957b571 to
e4dd35d
Compare
bergundy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still seeing changes related to @dandavison's PollComponent implementation but I trust that the two of you will iron things out. I didn't have any more blocking comments.
|
|
||
| type ActivityStore interface { | ||
| // PopulateRecordActivityTaskStartedResponse populates the response for RecordActivityTaskStarted | ||
| PopulateRecordActivityTaskStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not blocking but you will want Activity in both of these method names because when the workflow component implements these APIs it will be important to qualify.
Don't worry about this though, we are going to want to rewrite all of this eventually anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted.
| StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), | ||
| Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, | ||
| }, | ||
| Attempt: chasm.NewDataField(ctx, attemptState), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field should be called LastAttempt IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it is implicitly understood in the way it's used here as there's only a single attemptstate here and attemptstate is not used in any other way elsewhere. So, unless you feel strongly about it I'd prefer to keep it simple as Attempt.
afa9058 to
d19eee6
Compare
What changed?
Added standalone activity completion and failure handling. Refactored existing timeout failure handling. Refactored existing check for retry method.
Why?
Needed to support standalone activities full operation.
How did you test it?