-
Notifications
You must be signed in to change notification settings - Fork 1.2k
chasm.PollComponent and PollActivityExecution #8563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: standalone-activity
Are you sure you want to change the base?
Changes from 22 commits
f8ccafb
46eb18e
11cac65
11c6b59
b24a022
0db9858
c888969
5740320
1d27df4
1107a99
2ee2b75
ea12fcc
5c2c41e
c02c869
e1a52d4
24ef87c
5412f7e
612426f
7958efd
3904355
00ef0ff
da60095
6d5fc28
a604d80
6917450
27aafa7
c623906
e05af5a
6bd6d57
f325df8
099e6da
7b23898
b120e9c
7e67862
055044f
affc857
fa4e0ea
f6c5290
b540eb0
d1045de
f48a2b8
c98c8f9
18159b6
c4a0f1a
310ae97
4ca1d33
ccd63a4
7178c01
a42fb44
c0ed2df
5bd95d7
ecf4139
74e0a69
e904d87
5b8a8be
88e153d
a500aa0
23fe864
04a462d
e22f108
ff48584
a84784c
3485fb9
2f0b863
0fe23be
8a410b6
8e83fb0
d8d7721
65b83a0
8dbd866
f34f222
4146871
ddff1b5
dc248f4
1b46b23
c7f4ab0
270c13b
fbc77f3
e3d74a7
1f169fb
2f10ee2
0dda6cc
d3c709c
1e5ca51
36fb610
e5d900b
07cc8cf
b7e5c98
5cb1637
561b090
8087fa5
e4dd35d
1e43970
1ad72b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,17 @@ | ||
| package activity | ||
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "go.temporal.io/api/activity/v1" | ||
| commonpb "go.temporal.io/api/common/v1" | ||
| deploymentpb "go.temporal.io/api/deployment/v1" | ||
| enumspb "go.temporal.io/api/enums/v1" | ||
| failurepb "go.temporal.io/api/failure/v1" | ||
| historypb "go.temporal.io/api/history/v1" | ||
| "go.temporal.io/api/serviceerror" | ||
| "go.temporal.io/api/workflowservice/v1" | ||
| "go.temporal.io/server/api/historyservice/v1" | ||
| "go.temporal.io/server/api/matchingservice/v1" | ||
|
|
@@ -37,14 +40,12 @@ type Activity struct { | |
|
|
||
| *activitypb.ActivityState | ||
|
|
||
| // Standalone only | ||
| Visibility chasm.Field[*chasm.Visibility] | ||
| Attempt chasm.Field[*activitypb.ActivityAttemptState] | ||
| LastHeartbeat chasm.Field[*activitypb.ActivityHeartbeatState] | ||
| Outcome chasm.Field[*activitypb.ActivityOutcome] | ||
| // Standalone only | ||
| RequestData chasm.Field[*activitypb.ActivityRequestData] | ||
| Outcome chasm.Field[*activitypb.ActivityOutcome] | ||
|
|
||
| // Pointer to an implementation of the "store". for a workflow activity this would be a parent pointer back to | ||
| // the workflow. For a standalone activity this would be nil. | ||
| // TODO: revisit a standalone activity pointing to itself once we handle storing it more efficiently. | ||
|
|
@@ -314,3 +315,123 @@ func (a *Activity) RecordHeartbeat(ctx chasm.MutableContext, details *commonpb.P | |
| }) | ||
| return nil, nil | ||
| } | ||
|
|
||
| func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.ActivityExecutionInfo, error) { | ||
| if a.ActivityState == nil { | ||
| return nil, errors.New("activity state is nil") | ||
| } | ||
|
|
||
| // TODO(dan): support pause states | ||
| var status enumspb.ActivityExecutionStatus | ||
| var runState enumspb.PendingActivityState | ||
| switch a.GetStatus() { | ||
fretz12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| case activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_SCHEDULED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_STARTED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_FAILED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_FAILED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| default: | ||
| return nil, serviceerror.NewInternalf("unknown activity execution status: %s", a.GetStatus()) | ||
| } | ||
|
|
||
| requestData, err := a.RequestData.Get(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| key := ctx.ExecutionKey() | ||
|
|
||
| info := &activity.ActivityExecutionInfo{ | ||
| ActivityId: key.BusinessID, | ||
| RunId: key.EntityID, | ||
| ActivityType: a.GetActivityType(), | ||
| Status: status, | ||
| RunState: runState, | ||
| ScheduledTime: a.GetScheduledTime(), | ||
| Priority: a.GetPriority(), | ||
| Header: requestData.GetHeader(), | ||
| // TODO(dan): populate remaining fields | ||
| } | ||
|
|
||
| return info, nil | ||
| } | ||
|
|
||
| func (a *Activity) buildPollActivityExecutionResponse( | ||
| ctx chasm.Context, | ||
| req *activitypb.PollActivityExecutionRequest, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can just pass in the frontend request here and save the conversion and confusion between |
||
| ) (*activitypb.PollActivityExecutionResponse, error) { | ||
| request := req.GetFrontendRequest() | ||
|
|
||
| // TODO(dan): pass ref into this function? | ||
|
||
| token, err := ctx.Ref(a) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| var info *activity.ActivityExecutionInfo | ||
| if request.GetIncludeInfo() { | ||
| info, err = a.buildActivityExecutionInfo(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| var input *commonpb.Payloads | ||
| if request.GetIncludeInput() { | ||
| activityRequest, err := a.RequestData.Get(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| input = activityRequest.GetInput() | ||
| } | ||
|
|
||
| response := &workflowservice.PollActivityExecutionResponse{ | ||
| Info: info, | ||
| RunId: ctx.ExecutionKey().EntityID, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... I see we have the run ID here too but here you used the right one. |
||
| Input: input, | ||
| StateChangeLongPollToken: token, | ||
| } | ||
|
|
||
| if request.GetIncludeOutcome() { | ||
| activityOutcome, err := a.Outcome.Get(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| switch v := activityOutcome.GetVariant().(type) { | ||
| case *activitypb.ActivityOutcome_Failed_: | ||
| response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{ | ||
| Failure: v.Failed.GetFailure(), | ||
|
||
| } | ||
| case *activitypb.ActivityOutcome_Successful_: | ||
| response.Outcome = &workflowservice.PollActivityExecutionResponse_Result{ | ||
| Result: v.Successful.GetOutput(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return &activitypb.PollActivityExecutionResponse{ | ||
| FrontendResponse: response, | ||
| }, nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,6 +85,24 @@ func (h *frontendHandler) StartActivityExecution(ctx context.Context, req *workf | |
| return resp.GetFrontendResponse(), err | ||
| } | ||
|
|
||
| // PollActivityExecution handles PollActivityExecutionRequest. This method supports querying current | ||
| // activity state, optionally as a long-poll that waits for certain state changes. It is used by | ||
| // clients to poll for activity state and/or result. | ||
| func (h *frontendHandler) PollActivityExecution( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're missing validation here. At minimum, we'll need to validate that the activity ID is not empty and a check that run ID is provided if long poll token is provided.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there flag combinations that are invalid maybe? |
||
| ctx context.Context, | ||
| req *workflowservice.PollActivityExecutionRequest, | ||
| ) (*workflowservice.PollActivityExecutionResponse, error) { | ||
| namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace())) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| resp, err := h.client.PollActivityExecution(ctx, &activitypb.PollActivityExecutionRequest{ | ||
| NamespaceId: namespaceID.String(), | ||
| FrontendRequest: req, | ||
| }) | ||
| return resp.GetFrontendResponse(), err | ||
| } | ||
|
|
||
| func (h *frontendHandler) validateAndPopulateStartRequest( | ||
| req *workflowservice.StartActivityExecutionRequest, | ||
| namespaceID namespace.ID, | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed
operationFnfromPollComponentsince we don't need poll-with-mutation yet.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This totally works for me. We can discuss how to introduce this when the requirement comes up.