Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ func (a *Activity) HandleFailed(ctx chasm.MutableContext, req *historyservice.Re
return &historyservice.RespondActivityTaskFailedResponse{}, nil
}

// HandleCanceled updates the activity on activity canceled.
func (a *Activity) HandleCanceled(ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCanceledRequest) (
*historyservice.RespondActivityTaskCanceledResponse, error) {
if err := TransitionCanceled.Apply(a, ctx, request); err != nil {
return nil, err
}

return &historyservice.RespondActivityTaskCanceledResponse{}, nil
}

func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.TerminateActivityExecutionRequest) (
*activitypb.TerminateActivityExecutionResponse, error) {
if err := TransitionTerminated.Apply(a, ctx, req); err != nil {
Expand All @@ -284,6 +294,28 @@ func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) (*activitypb.Activ
return heartbeat, nil
}

func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *activitypb.CancelActivityExecutionRequest) (
*activitypb.CancelActivityExecutionResponse, error) {
newReqID := req.GetFrontendRequest().GetRequestId()
existingReqID := a.GetCancelState().GetRequestId()

// If already in cancel requested state, fail if request ID is different, else no-op
if a.ActivityState.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
if existingReqID != newReqID {
return nil, serviceerror.NewFailedPrecondition(
fmt.Sprintf("cancellation already requested with request ID %s", existingReqID))
}

return &activitypb.CancelActivityExecutionResponse{}, nil
}

if err := TransitionCancelRequested.Apply(a, ctx, req); err != nil {
return nil, err
}

return &activitypb.CancelActivityExecutionResponse{}, nil
}

func (a *Activity) shouldRetryOnFailure(ctx chasm.Context, failure *failurepb.Failure) (bool, time.Duration, error) {
var isRetryable bool

Expand Down Expand Up @@ -370,6 +402,10 @@ func (a *Activity) recordFailedAttempt(
}

func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) {
if !TransitionRescheduled.Possible(a) {
return false, 0, nil
}

attempt, err := a.Attempt.Get(ctx)
if err != nil {
return false, 0, err
Expand Down Expand Up @@ -484,6 +520,7 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti
ActivityId: key.BusinessID,
ActivityType: a.GetActivityType(),
Attempt: attempt.GetCount(),
CanceledReason: a.CancelState.GetReason(),
Header: requestData.GetHeader(),
HeartbeatDetails: heartbeat.GetDetails(),
LastAttemptCompleteTime: attempt.GetCompleteTime(),
Expand Down
27 changes: 27 additions & 0 deletions chasm/lib/activity/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,33 @@ func (h *frontendHandler) TerminateActivityExecution(
return &workflowservice.TerminateActivityExecutionResponse{}, nil
}

func (h *frontendHandler) RequestCancelActivityExecution(
ctx context.Context,
req *workflowservice.RequestCancelActivityExecutionRequest,
) (*workflowservice.RequestCancelActivityExecutionResponse, error) {
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
if err != nil {
return nil, err
}

// Since validation potentially mutates the request, we clone it first so that any retries use the original request.
req = common.CloneProto(req)
err = validateAndNormalizeRequestID(&req.RequestId, dynamicconfig.MaxIDLengthLimit.Get(h.dc)())
if err != nil {
return nil, err
}

_, err = h.client.CancelActivityExecution(ctx, &activitypb.CancelActivityExecutionRequest{
NamespaceId: namespaceID.String(),
FrontendRequest: req,
})
if err != nil {
return nil, err
}

return &workflowservice.RequestCancelActivityExecutionResponse{}, nil
}

func (h *frontendHandler) validateAndPopulateStartRequest(
req *workflowservice.StartActivityExecutionRequest,
namespaceID namespace.ID,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

144 changes: 120 additions & 24 deletions chasm/lib/activity/gen/activitypb/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading