diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 05996ea60f1..5aa68a125f4 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -259,6 +259,16 @@ func (a *Activity) HandleFailed(ctx chasm.MutableContext, req *historyservice.Re return &historyservice.RespondActivityTaskFailedResponse{}, nil } +// HandleCanceled updates the activity on activity canceled. +func (a *Activity) HandleCanceled(ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCanceledRequest) ( + *historyservice.RespondActivityTaskCanceledResponse, error) { + if err := TransitionCanceled.Apply(a, ctx, request); err != nil { + return nil, err + } + + return &historyservice.RespondActivityTaskCanceledResponse{}, nil +} + func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.TerminateActivityExecutionRequest) ( *activitypb.TerminateActivityExecutionResponse, error) { if err := TransitionTerminated.Apply(a, ctx, req); err != nil { @@ -284,6 +294,28 @@ func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) (*activitypb.Activ return heartbeat, nil } +func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *activitypb.CancelActivityExecutionRequest) ( + *activitypb.CancelActivityExecutionResponse, error) { + newReqID := req.GetFrontendRequest().GetRequestId() + existingReqID := a.GetCancelState().GetRequestId() + + // If already in cancel requested state, fail if request ID is different, else no-op + if a.ActivityState.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { + if existingReqID != newReqID { + return nil, serviceerror.NewFailedPrecondition( + fmt.Sprintf("cancellation already requested with request ID %s", existingReqID)) + } + + return &activitypb.CancelActivityExecutionResponse{}, nil + } + + if err := TransitionCancelRequested.Apply(a, ctx, req); err != nil { + return nil, err + } + + return &activitypb.CancelActivityExecutionResponse{}, nil +} + func (a *Activity) shouldRetryOnFailure(ctx chasm.Context, failure *failurepb.Failure) (bool, time.Duration, error) { var isRetryable bool @@ -370,6 +402,10 @@ func (a *Activity) recordFailedAttempt( } func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) { + if !TransitionRescheduled.Possible(a) { + return false, 0, nil + } + attempt, err := a.Attempt.Get(ctx) if err != nil { return false, 0, err @@ -484,6 +520,7 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti ActivityId: key.BusinessID, ActivityType: a.GetActivityType(), Attempt: attempt.GetCount(), + CanceledReason: a.CancelState.GetReason(), Header: requestData.GetHeader(), HeartbeatDetails: heartbeat.GetDetails(), LastAttemptCompleteTime: attempt.GetCompleteTime(), diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index 05cf24cbc5b..9ed9b40b372 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -131,6 +131,33 @@ func (h *frontendHandler) TerminateActivityExecution( return &workflowservice.TerminateActivityExecutionResponse{}, nil } +func (h *frontendHandler) RequestCancelActivityExecution( + ctx context.Context, + req *workflowservice.RequestCancelActivityExecutionRequest, +) (*workflowservice.RequestCancelActivityExecutionResponse, error) { + namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace())) + if err != nil { + return nil, err + } + + // Since validation potentially mutates the request, we clone it first so that any retries use the original request. + req = common.CloneProto(req) + err = validateAndNormalizeRequestID(&req.RequestId, dynamicconfig.MaxIDLengthLimit.Get(h.dc)()) + if err != nil { + return nil, err + } + + _, err = h.client.CancelActivityExecution(ctx, &activitypb.CancelActivityExecutionRequest{ + NamespaceId: namespaceID.String(), + FrontendRequest: req, + }) + if err != nil { + return nil, err + } + + return &workflowservice.RequestCancelActivityExecutionResponse{}, nil +} + func (h *frontendHandler) validateAndPopulateStartRequest( req *workflowservice.StartActivityExecutionRequest, namespaceID namespace.ID, diff --git a/chasm/lib/activity/gen/activitypb/v1/request_response.go-helpers.pb.go b/chasm/lib/activity/gen/activitypb/v1/request_response.go-helpers.pb.go index a5bd2e57a32..0661ceaf0ed 100644 --- a/chasm/lib/activity/gen/activitypb/v1/request_response.go-helpers.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/request_response.go-helpers.pb.go @@ -226,3 +226,77 @@ func (this *TerminateActivityExecutionResponse) Equal(that interface{}) bool { return proto.Equal(this, that1) } + +// Marshal an object of type CancelActivityExecutionRequest to the protobuf v3 wire format +func (val *CancelActivityExecutionRequest) Marshal() ([]byte, error) { + return proto.Marshal(val) +} + +// Unmarshal an object of type CancelActivityExecutionRequest from the protobuf v3 wire format +func (val *CancelActivityExecutionRequest) Unmarshal(buf []byte) error { + return proto.Unmarshal(buf, val) +} + +// Size returns the size of the object, in bytes, once serialized +func (val *CancelActivityExecutionRequest) Size() int { + return proto.Size(val) +} + +// Equal returns whether two CancelActivityExecutionRequest values are equivalent by recursively +// comparing the message's fields. +// For more information see the documentation for +// https://pkg.go.dev/google.golang.org/protobuf/proto#Equal +func (this *CancelActivityExecutionRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + var that1 *CancelActivityExecutionRequest + switch t := that.(type) { + case *CancelActivityExecutionRequest: + that1 = t + case CancelActivityExecutionRequest: + that1 = &t + default: + return false + } + + return proto.Equal(this, that1) +} + +// Marshal an object of type CancelActivityExecutionResponse to the protobuf v3 wire format +func (val *CancelActivityExecutionResponse) Marshal() ([]byte, error) { + return proto.Marshal(val) +} + +// Unmarshal an object of type CancelActivityExecutionResponse from the protobuf v3 wire format +func (val *CancelActivityExecutionResponse) Unmarshal(buf []byte) error { + return proto.Unmarshal(buf, val) +} + +// Size returns the size of the object, in bytes, once serialized +func (val *CancelActivityExecutionResponse) Size() int { + return proto.Size(val) +} + +// Equal returns whether two CancelActivityExecutionResponse values are equivalent by recursively +// comparing the message's fields. +// For more information see the documentation for +// https://pkg.go.dev/google.golang.org/protobuf/proto#Equal +func (this *CancelActivityExecutionResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + var that1 *CancelActivityExecutionResponse + switch t := that.(type) { + case *CancelActivityExecutionResponse: + that1 = t + case CancelActivityExecutionResponse: + that1 = &t + default: + return false + } + + return proto.Equal(this, that1) +} diff --git a/chasm/lib/activity/gen/activitypb/v1/request_response.pb.go b/chasm/lib/activity/gen/activitypb/v1/request_response.pb.go index 13783b1144d..8603c4ee551 100644 --- a/chasm/lib/activity/gen/activitypb/v1/request_response.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/request_response.pb.go @@ -303,6 +303,94 @@ func (*TerminateActivityExecutionResponse) Descriptor() ([]byte, []int) { return file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_rawDescGZIP(), []int{5} } +type CancelActivityExecutionRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + NamespaceId string `protobuf:"bytes,1,opt,name=namespace_id,json=namespaceId,proto3" json:"namespace_id,omitempty"` + FrontendRequest *v1.RequestCancelActivityExecutionRequest `protobuf:"bytes,2,opt,name=frontend_request,json=frontendRequest,proto3" json:"frontend_request,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CancelActivityExecutionRequest) Reset() { + *x = CancelActivityExecutionRequest{} + mi := &file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CancelActivityExecutionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CancelActivityExecutionRequest) ProtoMessage() {} + +func (x *CancelActivityExecutionRequest) ProtoReflect() protoreflect.Message { + mi := &file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CancelActivityExecutionRequest.ProtoReflect.Descriptor instead. +func (*CancelActivityExecutionRequest) Descriptor() ([]byte, []int) { + return file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_rawDescGZIP(), []int{6} +} + +func (x *CancelActivityExecutionRequest) GetNamespaceId() string { + if x != nil { + return x.NamespaceId + } + return "" +} + +func (x *CancelActivityExecutionRequest) GetFrontendRequest() *v1.RequestCancelActivityExecutionRequest { + if x != nil { + return x.FrontendRequest + } + return nil +} + +type CancelActivityExecutionResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CancelActivityExecutionResponse) Reset() { + *x = CancelActivityExecutionResponse{} + mi := &file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CancelActivityExecutionResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CancelActivityExecutionResponse) ProtoMessage() {} + +func (x *CancelActivityExecutionResponse) ProtoReflect() protoreflect.Message { + mi := &file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CancelActivityExecutionResponse.ProtoReflect.Descriptor instead. +func (*CancelActivityExecutionResponse) Descriptor() ([]byte, []int) { + return file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_rawDescGZIP(), []int{7} +} + var File_temporal_server_chasm_lib_activity_proto_v1_request_response_proto protoreflect.FileDescriptor const file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_rawDesc = "" + @@ -321,7 +409,11 @@ const file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_ra "!TerminateActivityExecutionRequest\x12!\n" + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12m\n" + "\x10frontend_request\x18\x02 \x01(\v2B.temporal.api.workflowservice.v1.TerminateActivityExecutionRequestR\x0ffrontendRequest\"$\n" + - "\"TerminateActivityExecutionResponseBDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" + "\"TerminateActivityExecutionResponse\"\xb6\x01\n" + + "\x1eCancelActivityExecutionRequest\x12!\n" + + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12q\n" + + "\x10frontend_request\x18\x02 \x01(\v2F.temporal.api.workflowservice.v1.RequestCancelActivityExecutionRequestR\x0ffrontendRequest\"!\n" + + "\x1fCancelActivityExecutionResponseBDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" var ( file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_rawDescOnce sync.Once @@ -335,31 +427,35 @@ func file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_raw return file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_rawDescData } -var file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_goTypes = []any{ - (*StartActivityExecutionRequest)(nil), // 0: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionRequest - (*StartActivityExecutionResponse)(nil), // 1: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionResponse - (*PollActivityExecutionRequest)(nil), // 2: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionRequest - (*PollActivityExecutionResponse)(nil), // 3: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionResponse - (*TerminateActivityExecutionRequest)(nil), // 4: temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionRequest - (*TerminateActivityExecutionResponse)(nil), // 5: temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionResponse - (*v1.StartActivityExecutionRequest)(nil), // 6: temporal.api.workflowservice.v1.StartActivityExecutionRequest - (*v1.StartActivityExecutionResponse)(nil), // 7: temporal.api.workflowservice.v1.StartActivityExecutionResponse - (*v1.PollActivityExecutionRequest)(nil), // 8: temporal.api.workflowservice.v1.PollActivityExecutionRequest - (*v1.PollActivityExecutionResponse)(nil), // 9: temporal.api.workflowservice.v1.PollActivityExecutionResponse - (*v1.TerminateActivityExecutionRequest)(nil), // 10: temporal.api.workflowservice.v1.TerminateActivityExecutionRequest + (*StartActivityExecutionRequest)(nil), // 0: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionRequest + (*StartActivityExecutionResponse)(nil), // 1: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionResponse + (*PollActivityExecutionRequest)(nil), // 2: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionRequest + (*PollActivityExecutionResponse)(nil), // 3: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionResponse + (*TerminateActivityExecutionRequest)(nil), // 4: temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionRequest + (*TerminateActivityExecutionResponse)(nil), // 5: temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionResponse + (*CancelActivityExecutionRequest)(nil), // 6: temporal.server.chasm.lib.activity.proto.v1.CancelActivityExecutionRequest + (*CancelActivityExecutionResponse)(nil), // 7: temporal.server.chasm.lib.activity.proto.v1.CancelActivityExecutionResponse + (*v1.StartActivityExecutionRequest)(nil), // 8: temporal.api.workflowservice.v1.StartActivityExecutionRequest + (*v1.StartActivityExecutionResponse)(nil), // 9: temporal.api.workflowservice.v1.StartActivityExecutionResponse + (*v1.PollActivityExecutionRequest)(nil), // 10: temporal.api.workflowservice.v1.PollActivityExecutionRequest + (*v1.PollActivityExecutionResponse)(nil), // 11: temporal.api.workflowservice.v1.PollActivityExecutionResponse + (*v1.TerminateActivityExecutionRequest)(nil), // 12: temporal.api.workflowservice.v1.TerminateActivityExecutionRequest + (*v1.RequestCancelActivityExecutionRequest)(nil), // 13: temporal.api.workflowservice.v1.RequestCancelActivityExecutionRequest } var file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_depIdxs = []int32{ - 6, // 0: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionRequest.frontend_request:type_name -> temporal.api.workflowservice.v1.StartActivityExecutionRequest - 7, // 1: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionResponse.frontend_response:type_name -> temporal.api.workflowservice.v1.StartActivityExecutionResponse - 8, // 2: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionRequest.frontend_request:type_name -> temporal.api.workflowservice.v1.PollActivityExecutionRequest - 9, // 3: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionResponse.frontend_response:type_name -> temporal.api.workflowservice.v1.PollActivityExecutionResponse - 10, // 4: temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionRequest.frontend_request:type_name -> temporal.api.workflowservice.v1.TerminateActivityExecutionRequest - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 8, // 0: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionRequest.frontend_request:type_name -> temporal.api.workflowservice.v1.StartActivityExecutionRequest + 9, // 1: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionResponse.frontend_response:type_name -> temporal.api.workflowservice.v1.StartActivityExecutionResponse + 10, // 2: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionRequest.frontend_request:type_name -> temporal.api.workflowservice.v1.PollActivityExecutionRequest + 11, // 3: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionResponse.frontend_response:type_name -> temporal.api.workflowservice.v1.PollActivityExecutionResponse + 12, // 4: temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionRequest.frontend_request:type_name -> temporal.api.workflowservice.v1.TerminateActivityExecutionRequest + 13, // 5: temporal.server.chasm.lib.activity.proto.v1.CancelActivityExecutionRequest.frontend_request:type_name -> temporal.api.workflowservice.v1.RequestCancelActivityExecutionRequest + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_init() } @@ -373,7 +469,7 @@ func file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_ini GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_rawDesc), len(file_temporal_server_chasm_lib_activity_proto_v1_request_response_proto_rawDesc)), NumEnums: 0, - NumMessages: 6, + NumMessages: 8, NumExtensions: 0, NumServices: 0, }, diff --git a/chasm/lib/activity/gen/activitypb/v1/service.pb.go b/chasm/lib/activity/gen/activitypb/v1/service.pb.go index 8cb01592bfc..86eee004b21 100644 --- a/chasm/lib/activity/gen/activitypb/v1/service.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/service.pb.go @@ -26,29 +26,34 @@ var File_temporal_server_chasm_lib_activity_proto_v1_service_proto protoreflect. const file_temporal_server_chasm_lib_activity_proto_v1_service_proto_rawDesc = "" + "\n" + - "9temporal/server/chasm/lib/activity/proto/v1/service.proto\x12+temporal.server.chasm.lib.activity.proto.v1\x1aBtemporal/server/chasm/lib/activity/proto/v1/request_response.proto\x1a.temporal/server/api/routing/v1/extension.proto2\xa2\x05\n" + + "9temporal/server/chasm/lib/activity/proto/v1/service.proto\x12+temporal.server.chasm.lib.activity.proto.v1\x1aBtemporal/server/chasm/lib/activity/proto/v1/request_response.proto\x1a.temporal/server/api/routing/v1/extension.proto2\xfd\x06\n" + "\x0fActivityService\x12\xd5\x01\n" + "\x16StartActivityExecution\x12J.temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionRequest\x1aK.temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionResponse\"\"\x92\xc4\x03\x1e\x1a\x1cfrontend_request.activity_id\x12\xd2\x01\n" + "\x15PollActivityExecution\x12I.temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionRequest\x1aJ.temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionResponse\"\"\x92\xc4\x03\x1e\x1a\x1cfrontend_request.activity_id\x12\xe1\x01\n" + - "\x1aTerminateActivityExecution\x12N.temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionRequest\x1aO.temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionResponse\"\"\x92\xc4\x03\x1e\x1a\x1cfrontend_request.activity_idBDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" + "\x1aTerminateActivityExecution\x12N.temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionRequest\x1aO.temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionResponse\"\"\x92\xc4\x03\x1e\x1a\x1cfrontend_request.activity_id\x12\xd8\x01\n" + + "\x17CancelActivityExecution\x12K.temporal.server.chasm.lib.activity.proto.v1.CancelActivityExecutionRequest\x1aL.temporal.server.chasm.lib.activity.proto.v1.CancelActivityExecutionResponse\"\"\x92\xc4\x03\x1e\x1a\x1cfrontend_request.activity_idBDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" var file_temporal_server_chasm_lib_activity_proto_v1_service_proto_goTypes = []any{ (*StartActivityExecutionRequest)(nil), // 0: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionRequest (*PollActivityExecutionRequest)(nil), // 1: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionRequest (*TerminateActivityExecutionRequest)(nil), // 2: temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionRequest - (*StartActivityExecutionResponse)(nil), // 3: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionResponse - (*PollActivityExecutionResponse)(nil), // 4: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionResponse - (*TerminateActivityExecutionResponse)(nil), // 5: temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionResponse + (*CancelActivityExecutionRequest)(nil), // 3: temporal.server.chasm.lib.activity.proto.v1.CancelActivityExecutionRequest + (*StartActivityExecutionResponse)(nil), // 4: temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionResponse + (*PollActivityExecutionResponse)(nil), // 5: temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionResponse + (*TerminateActivityExecutionResponse)(nil), // 6: temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionResponse + (*CancelActivityExecutionResponse)(nil), // 7: temporal.server.chasm.lib.activity.proto.v1.CancelActivityExecutionResponse } var file_temporal_server_chasm_lib_activity_proto_v1_service_proto_depIdxs = []int32{ 0, // 0: temporal.server.chasm.lib.activity.proto.v1.ActivityService.StartActivityExecution:input_type -> temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionRequest 1, // 1: temporal.server.chasm.lib.activity.proto.v1.ActivityService.PollActivityExecution:input_type -> temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionRequest 2, // 2: temporal.server.chasm.lib.activity.proto.v1.ActivityService.TerminateActivityExecution:input_type -> temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionRequest - 3, // 3: temporal.server.chasm.lib.activity.proto.v1.ActivityService.StartActivityExecution:output_type -> temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionResponse - 4, // 4: temporal.server.chasm.lib.activity.proto.v1.ActivityService.PollActivityExecution:output_type -> temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionResponse - 5, // 5: temporal.server.chasm.lib.activity.proto.v1.ActivityService.TerminateActivityExecution:output_type -> temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionResponse - 3, // [3:6] is the sub-list for method output_type - 0, // [0:3] is the sub-list for method input_type + 3, // 3: temporal.server.chasm.lib.activity.proto.v1.ActivityService.CancelActivityExecution:input_type -> temporal.server.chasm.lib.activity.proto.v1.CancelActivityExecutionRequest + 4, // 4: temporal.server.chasm.lib.activity.proto.v1.ActivityService.StartActivityExecution:output_type -> temporal.server.chasm.lib.activity.proto.v1.StartActivityExecutionResponse + 5, // 5: temporal.server.chasm.lib.activity.proto.v1.ActivityService.PollActivityExecution:output_type -> temporal.server.chasm.lib.activity.proto.v1.PollActivityExecutionResponse + 6, // 6: temporal.server.chasm.lib.activity.proto.v1.ActivityService.TerminateActivityExecution:output_type -> temporal.server.chasm.lib.activity.proto.v1.TerminateActivityExecutionResponse + 7, // 7: temporal.server.chasm.lib.activity.proto.v1.ActivityService.CancelActivityExecution:output_type -> temporal.server.chasm.lib.activity.proto.v1.CancelActivityExecutionResponse + 4, // [4:8] is the sub-list for method output_type + 0, // [0:4] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/chasm/lib/activity/gen/activitypb/v1/service_client.pb.go b/chasm/lib/activity/gen/activitypb/v1/service_client.pb.go index 0ffce62f3f1..0280a10f902 100644 --- a/chasm/lib/activity/gen/activitypb/v1/service_client.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/service_client.pb.go @@ -187,3 +187,46 @@ func (c *ActivityServiceLayeredClient) TerminateActivityExecution( } return backoff.ThrottleRetryContextWithReturn(ctx, call, c.retryPolicy, common.IsServiceClientTransientError) } +func (c *ActivityServiceLayeredClient) callCancelActivityExecutionNoRetry( + ctx context.Context, + request *CancelActivityExecutionRequest, + opts ...grpc.CallOption, +) (*CancelActivityExecutionResponse, error) { + var response *CancelActivityExecutionResponse + var err error + startTime := time.Now().UTC() + // the caller is a namespace, hence the tag below. + caller := headers.GetCallerInfo(ctx).CallerName + metricsHandler := c.metricsHandler.WithTags( + metrics.OperationTag("ActivityService.CancelActivityExecution"), + metrics.NamespaceTag(caller), + metrics.ServiceRoleTag(metrics.HistoryRoleTagValue), + ) + metrics.ClientRequests.With(metricsHandler).Record(1) + defer func() { + if err != nil { + metrics.ClientFailures.With(metricsHandler).Record(1, metrics.ServiceErrorTypeTag(err)) + } + metrics.ClientLatency.With(metricsHandler).Record(time.Since(startTime)) + }() + shardID := common.WorkflowIDToHistoryShard(request.GetNamespaceId(), request.GetFrontendRequest().GetActivityId(), c.numShards) + op := func(ctx context.Context, client ActivityServiceClient) error { + var err error + ctx, cancel := context.WithTimeout(ctx, history.DefaultTimeout) + defer cancel() + response, err = client.CancelActivityExecution(ctx, request, opts...) + return err + } + err = c.redirector.Execute(ctx, shardID, op) + return response, err +} +func (c *ActivityServiceLayeredClient) CancelActivityExecution( + ctx context.Context, + request *CancelActivityExecutionRequest, + opts ...grpc.CallOption, +) (*CancelActivityExecutionResponse, error) { + call := func(ctx context.Context) (*CancelActivityExecutionResponse, error) { + return c.callCancelActivityExecutionNoRetry(ctx, request, opts...) + } + return backoff.ThrottleRetryContextWithReturn(ctx, call, c.retryPolicy, common.IsServiceClientTransientError) +} diff --git a/chasm/lib/activity/gen/activitypb/v1/service_grpc.pb.go b/chasm/lib/activity/gen/activitypb/v1/service_grpc.pb.go index cf2fb1ed291..badb22fbb35 100644 --- a/chasm/lib/activity/gen/activitypb/v1/service_grpc.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/service_grpc.pb.go @@ -23,6 +23,7 @@ const ( ActivityService_StartActivityExecution_FullMethodName = "/temporal.server.chasm.lib.activity.proto.v1.ActivityService/StartActivityExecution" ActivityService_PollActivityExecution_FullMethodName = "/temporal.server.chasm.lib.activity.proto.v1.ActivityService/PollActivityExecution" ActivityService_TerminateActivityExecution_FullMethodName = "/temporal.server.chasm.lib.activity.proto.v1.ActivityService/TerminateActivityExecution" + ActivityService_CancelActivityExecution_FullMethodName = "/temporal.server.chasm.lib.activity.proto.v1.ActivityService/CancelActivityExecution" ) // ActivityServiceClient is the client API for ActivityService service. @@ -32,6 +33,7 @@ type ActivityServiceClient interface { StartActivityExecution(ctx context.Context, in *StartActivityExecutionRequest, opts ...grpc.CallOption) (*StartActivityExecutionResponse, error) PollActivityExecution(ctx context.Context, in *PollActivityExecutionRequest, opts ...grpc.CallOption) (*PollActivityExecutionResponse, error) TerminateActivityExecution(ctx context.Context, in *TerminateActivityExecutionRequest, opts ...grpc.CallOption) (*TerminateActivityExecutionResponse, error) + CancelActivityExecution(ctx context.Context, in *CancelActivityExecutionRequest, opts ...grpc.CallOption) (*CancelActivityExecutionResponse, error) } type activityServiceClient struct { @@ -69,6 +71,15 @@ func (c *activityServiceClient) TerminateActivityExecution(ctx context.Context, return out, nil } +func (c *activityServiceClient) CancelActivityExecution(ctx context.Context, in *CancelActivityExecutionRequest, opts ...grpc.CallOption) (*CancelActivityExecutionResponse, error) { + out := new(CancelActivityExecutionResponse) + err := c.cc.Invoke(ctx, ActivityService_CancelActivityExecution_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ActivityServiceServer is the server API for ActivityService service. // All implementations must embed UnimplementedActivityServiceServer // for forward compatibility @@ -76,6 +87,7 @@ type ActivityServiceServer interface { StartActivityExecution(context.Context, *StartActivityExecutionRequest) (*StartActivityExecutionResponse, error) PollActivityExecution(context.Context, *PollActivityExecutionRequest) (*PollActivityExecutionResponse, error) TerminateActivityExecution(context.Context, *TerminateActivityExecutionRequest) (*TerminateActivityExecutionResponse, error) + CancelActivityExecution(context.Context, *CancelActivityExecutionRequest) (*CancelActivityExecutionResponse, error) mustEmbedUnimplementedActivityServiceServer() } @@ -92,6 +104,9 @@ func (UnimplementedActivityServiceServer) PollActivityExecution(context.Context, func (UnimplementedActivityServiceServer) TerminateActivityExecution(context.Context, *TerminateActivityExecutionRequest) (*TerminateActivityExecutionResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method TerminateActivityExecution not implemented") } +func (UnimplementedActivityServiceServer) CancelActivityExecution(context.Context, *CancelActivityExecutionRequest) (*CancelActivityExecutionResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CancelActivityExecution not implemented") +} func (UnimplementedActivityServiceServer) mustEmbedUnimplementedActivityServiceServer() {} // UnsafeActivityServiceServer may be embedded to opt out of forward compatibility for this service. @@ -159,6 +174,24 @@ func _ActivityService_TerminateActivityExecution_Handler(srv interface{}, ctx co return interceptor(ctx, in, info, handler) } +func _ActivityService_CancelActivityExecution_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CancelActivityExecutionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ActivityServiceServer).CancelActivityExecution(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ActivityService_CancelActivityExecution_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ActivityServiceServer).CancelActivityExecution(ctx, req.(*CancelActivityExecutionRequest)) + } + return interceptor(ctx, in, info, handler) +} + // ActivityService_ServiceDesc is the grpc.ServiceDesc for ActivityService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -178,6 +211,10 @@ var ActivityService_ServiceDesc = grpc.ServiceDesc{ MethodName: "TerminateActivityExecution", Handler: _ActivityService_TerminateActivityExecution_Handler, }, + { + MethodName: "CancelActivityExecution", + Handler: _ActivityService_CancelActivityExecution_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "temporal/server/chasm/lib/activity/proto/v1/service.proto", diff --git a/chasm/lib/activity/handler.go b/chasm/lib/activity/handler.go index 48a1ec1880b..6134abe204b 100644 --- a/chasm/lib/activity/handler.go +++ b/chasm/lib/activity/handler.go @@ -179,3 +179,30 @@ func (h *handler) TerminateActivityExecution( return response, nil } + +// CancelActivityExecution requests cancellation on a standalone activity execution +func (h *handler) CancelActivityExecution( + ctx context.Context, + req *activitypb.CancelActivityExecutionRequest, +) (response *activitypb.CancelActivityExecutionResponse, err error) { + frontendReq := req.GetFrontendRequest() + + ref := chasm.NewComponentRef[*Activity](chasm.EntityKey{ + NamespaceID: req.GetNamespaceId(), + BusinessID: frontendReq.GetActivityId(), + EntityID: frontendReq.GetRunId(), + }) + + response, _, err = chasm.UpdateComponent( + ctx, + ref, + (*Activity).handleCancellationRequested, + req, + ) + + if err != nil { + return nil, err + } + + return response, nil +} diff --git a/chasm/lib/activity/proto/v1/request_response.proto b/chasm/lib/activity/proto/v1/request_response.proto index 7e88da20ced..cb4ae2b6670 100644 --- a/chasm/lib/activity/proto/v1/request_response.proto +++ b/chasm/lib/activity/proto/v1/request_response.proto @@ -34,3 +34,12 @@ message TerminateActivityExecutionRequest { message TerminateActivityExecutionResponse { } + +message CancelActivityExecutionRequest { + string namespace_id = 1; + + temporal.api.workflowservice.v1.RequestCancelActivityExecutionRequest frontend_request = 2; +} + +message CancelActivityExecutionResponse { +} diff --git a/chasm/lib/activity/proto/v1/service.proto b/chasm/lib/activity/proto/v1/service.proto index ae052044567..bbda352e4a9 100644 --- a/chasm/lib/activity/proto/v1/service.proto +++ b/chasm/lib/activity/proto/v1/service.proto @@ -19,4 +19,8 @@ service ActivityService { rpc TerminateActivityExecution(TerminateActivityExecutionRequest) returns (TerminateActivityExecutionResponse) { option (temporal.server.api.routing.v1.routing).execution_id = "frontend_request.activity_id"; } + + rpc CancelActivityExecution(CancelActivityExecutionRequest) returns (CancelActivityExecutionResponse) { + option (temporal.server.api.routing.v1.routing).execution_id = "frontend_request.activity_id"; + } } diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index e9b40c8435f..2060c81c5f4 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -155,6 +155,7 @@ var TransitionStarted = chasm.NewTransition( var TransitionCompleted = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, func(a *Activity, ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCompletedRequest) error { @@ -196,6 +197,7 @@ var TransitionCompleted = chasm.NewTransition( var TransitionFailed = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, func(a *Activity, ctx chasm.MutableContext, req *historyservice.RespondActivityTaskFailedRequest) error { @@ -236,6 +238,7 @@ var TransitionTerminated = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, func(a *Activity, ctx chasm.MutableContext, req *activitypb.TerminateActivityExecutionRequest) error { @@ -279,11 +282,75 @@ var TransitionTerminated = chasm.NewTransition( }, ) +// TransitionCancelRequested affects a transition to CancelRequested status +var TransitionCancelRequested = chasm.NewTransition( + []activitypb.ActivityExecutionStatus{ + activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, + activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, // Allow idempotent transition + }, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + func(a *Activity, ctx chasm.MutableContext, req *activitypb.CancelActivityExecutionRequest) error { + frontendReq := req.GetFrontendRequest() + + a.CancelState = &activitypb.ActivityCancelState{ + Identity: frontendReq.GetIdentity(), + RequestId: req.GetFrontendRequest().GetRequestId(), + Reason: frontendReq.GetReason(), + RequestTime: timestamppb.New(ctx.Now(a)), + } + + return nil + }, +) + +// TransitionCanceled affects a transition to Canceled status +var TransitionCanceled = chasm.NewTransition( + []activitypb.ActivityExecutionStatus{ + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + }, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, + func(a *Activity, ctx chasm.MutableContext, req *historyservice.RespondActivityTaskCanceledRequest) error { + store, err := a.Store.Get(ctx) + if err != nil { + return err + } + + if store == nil { + store = a + } + + return store.RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + outcome, err := a.Outcome.Get(ctx) + if err != nil { + return err + } + + failure := &failurepb.Failure{ + FailureInfo: &failurepb.Failure_CanceledFailureInfo{ + CanceledFailureInfo: &failurepb.CanceledFailureInfo{ + Details: req.GetCancelRequest().GetDetails(), + }, + }, + } + + outcome.Variant = &activitypb.ActivityOutcome_Failed_{ + Failed: &activitypb.ActivityOutcome_Failed{ + Failure: failure, + }, + } + + return nil + }) + }, +) + // TransitionTimedOut affects a transition to TimedOut status var TransitionTimedOut = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, func(a *Activity, ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error { diff --git a/chasm/lib/activity/statemachine_test.go b/chasm/lib/activity/statemachine_test.go index 88e20b7ea99..40dd668223c 100644 --- a/chasm/lib/activity/statemachine_test.go +++ b/chasm/lib/activity/statemachine_test.go @@ -455,3 +455,73 @@ func TestTransitionTerminated(t *testing.T) { } protorequire.ProtoEqual(t, expectedFailure, outcome.GetFailed().GetFailure()) } + +func TestTransitionCancelRequested(t *testing.T) { + ctx := &chasm.MockMutableContext{} + ctx.HandleNow = func(chasm.Component) time.Time { return defaultTime } + attemptState := &activitypb.ActivityAttemptState{Count: 1} + + activity := &Activity{ + ActivityState: &activitypb.ActivityState{ + RetryPolicy: defaultRetryPolicy, + ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), + ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + Status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + }, + Attempt: chasm.NewDataField(ctx, attemptState), + } + + err := TransitionCancelRequested.Apply(activity, ctx, &activitypb.CancelActivityExecutionRequest{ + FrontendRequest: &workflowservice.RequestCancelActivityExecutionRequest{ + RequestId: "cancel-request", + Reason: "Test Cancel Requested", + Identity: "worker", + }, + }) + require.NoError(t, err) + require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, activity.Status) + + cancelState := activity.CancelState + + require.Equal(t, "cancel-request", cancelState.GetRequestId()) + require.Equal(t, "worker", cancelState.GetIdentity()) + require.Equal(t, "Test Cancel Requested", cancelState.GetReason()) + require.NotNil(t, cancelState.GetRequestTime()) +} + +func TestTransitionCanceled(t *testing.T) { + ctx := &chasm.MockMutableContext{} + ctx.HandleNow = func(chasm.Component) time.Time { return defaultTime } + attemptState := &activitypb.ActivityAttemptState{Count: 1} + outcome := &activitypb.ActivityOutcome{} + + activity := &Activity{ + ActivityState: &activitypb.ActivityState{ + RetryPolicy: defaultRetryPolicy, + ScheduleToCloseTimeout: durationpb.New(defaultScheduleToCloseTimeout), + ScheduleToStartTimeout: durationpb.New(defaultScheduleToStartTimeout), + StartToCloseTimeout: durationpb.New(defaultStartToCloseTimeout), + Status: activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + }, + Attempt: chasm.NewDataField(ctx, attemptState), + Outcome: chasm.NewDataField(ctx, outcome), + } + + err := TransitionCanceled.Apply(activity, ctx, &historyservice.RespondActivityTaskCanceledRequest{ + CancelRequest: &workflowservice.RespondActivityTaskCanceledRequest{ + Details: payloads.EncodeString("Details"), + }, + }) + require.NoError(t, err) + require.Equal(t, activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, activity.Status) + + expectedFailure := &failurepb.Failure{ + FailureInfo: &failurepb.Failure_CanceledFailureInfo{ + CanceledFailureInfo: &failurepb.CanceledFailureInfo{ + Details: payloads.EncodeString("Details"), + }, + }, + } + protorequire.ProtoEqual(t, expectedFailure, outcome.GetFailed().GetFailure()) +} diff --git a/chasm/lib/activity/validator.go b/chasm/lib/activity/validator.go index 18427d9d3a6..275039d953e 100644 --- a/chasm/lib/activity/validator.go +++ b/chasm/lib/activity/validator.go @@ -181,7 +181,7 @@ func ValidateStandaloneActivity( saMapperProvider searchattribute.MapperProvider, saValidator *searchattribute.Validator, ) error { - if err := validateRequestID(requestID, maxIDLengthLimit); err != nil { + if err := validateAndNormalizeRequestID(requestID, maxIDLengthLimit); err != nil { return err } @@ -209,7 +209,7 @@ func ValidateStandaloneActivity( return nil } -func validateRequestID(requestID *string, maxIDLengthLimit int) error { +func validateAndNormalizeRequestID(requestID *string, maxIDLengthLimit int) error { if *requestID == "" { // For easy direct API use, we default the request ID here but expect all SDKs and other auto-retrying clients to set it *requestID = uuid.New().String() diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index d89dc0dcb1e..a0e30731b88 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -1830,9 +1830,6 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context, runID := request.GetRunId() // runID is optional so can be empty activityID := request.GetActivityId() - if workflowID == "" { - return nil, errWorkflowIDNotSet - } if activityID == "" { return nil, errActivityIDNotSet } @@ -1840,6 +1837,22 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context, return nil, errIdentityTooLong } + // If workflowID is empty, it means the activity is a standalone activity and we need to set the component ref + // TODO Need to add a dynamic config to enable standalone configs, and incorporate that into the check below + var componentRef []byte + if workflowID == "" { + ref := chasm.NewComponentRef[*activity.Activity](chasm.EntityKey{ + NamespaceID: namespaceID.String(), + BusinessID: activityID, + EntityID: runID, + }) + + componentRef, err = ref.Serialize(wh.registry) + if err != nil { + return nil, err + } + } + taskToken := tasktoken.NewActivityTaskToken( namespaceID.String(), workflowID, @@ -1851,7 +1864,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context, nil, common.EmptyVersion, common.EmptyVersion, - nil, + componentRef, ) token, err := wh.tokenSerializer.Serialize(taskToken) if err != nil { diff --git a/service/history/api/respondactivitytaskcanceled/api.go b/service/history/api/respondactivitytaskcanceled/api.go index 32e7b2226d4..5eb2efa8c94 100644 --- a/service/history/api/respondactivitytaskcanceled/api.go +++ b/service/history/api/respondactivitytaskcanceled/api.go @@ -6,6 +6,8 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/chasm" + "go.temporal.io/server/chasm/lib/activity" "go.temporal.io/server/common" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/metrics" @@ -35,6 +37,23 @@ func Invoke( if err0 != nil { return nil, consts.ErrDeserializingToken } + + // Handle standalone activity if component ref is present in the token + if componentRef := token.GetComponentRef(); len(componentRef) > 0 { + response, _, err := chasm.UpdateComponent( + ctx, + componentRef, + (*activity.Activity).HandleCanceled, + req, + ) + + if err != nil { + return nil, err + } + + return response, nil + } + if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil { return nil, err } diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 8ac0b3e1b31..3c714ed74f5 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -17,6 +17,7 @@ import ( "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/payload" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/testing/protorequire" "go.temporal.io/server/common/testing/testvars" @@ -375,6 +376,334 @@ func (s *standaloneActivityTestSuite) TestCompletedActivity_CannotTerminate() { require.Error(t, err) } +func (s *standaloneActivityTestSuite) TestActivityCancelled() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + taskQueue := s.tv.TaskQueue().String() + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + pollTaskResp := s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) + + _, err := s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: s.tv.ActivityID(), + RunId: runID, + Identity: "cancelling-worker", + RequestId: s.tv.RequestID(), + Reason: "Test Cancellation", + }) + require.NoError(t, err) + + // TODO: we should get the cancel request from heart beat once we implement it + + details := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + payload.EncodeString("Canceled Details"), + }, + } + + _, err = s.FrontendClient().RespondActivityTaskCanceled(ctx, &workflowservice.RespondActivityTaskCanceledRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollTaskResp.TaskToken, + Details: details, + Identity: "new-worker", + }) + require.NoError(t, err) + + activityResp, err := s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + IncludeInfo: true, + IncludeInput: true, + IncludeOutcome: true, + }) + require.NoError(t, err) + + info := activityResp.GetInfo() + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, info.GetRunState()) + require.Equal(t, "Test Cancellation", info.GetCanceledReason()) + protorequire.ProtoEqual(t, details, activityResp.GetFailure().GetCanceledFailureInfo().GetDetails()) +} + +func (s *standaloneActivityTestSuite) TestActivityCancelledByID() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + taskQueue := s.tv.TaskQueue().String() + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) + + _, err := s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: s.tv.ActivityID(), + RunId: runID, + Identity: "cancelling-worker", + RequestId: s.tv.RequestID(), + Reason: "Test Cancellation", + }) + require.NoError(t, err) + + // TODO: we should get the cancel request from heart beat once we implement it + + details := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + payload.EncodeString("Canceled Details"), + }, + } + + _, err = s.FrontendClient().RespondActivityTaskCanceledById(ctx, &workflowservice.RespondActivityTaskCanceledByIdRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + Details: details, + Identity: "new-worker", + }) + require.NoError(t, err) + + activityResp, err := s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + IncludeInfo: true, + IncludeInput: true, + IncludeOutcome: true, + }) + require.NoError(t, err) + + info := activityResp.GetInfo() + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, info.GetRunState()) + require.Equal(t, "Test Cancellation", info.GetCanceledReason()) + protorequire.ProtoEqual(t, details, activityResp.GetFailure().GetCanceledFailureInfo().GetDetails()) +} + +func (s *standaloneActivityTestSuite) TestActivityCancelled_FailsIfNeverRequested() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + taskQueue := s.tv.TaskQueue().String() + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + pollTaskResp := s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) + + details := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + payload.EncodeString("Canceled Details"), + }, + } + + _, err := s.FrontendClient().RespondActivityTaskCanceled(ctx, &workflowservice.RespondActivityTaskCanceledRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollTaskResp.TaskToken, + Details: details, + Identity: "new-worker", + }) + require.Error(t, err) +} + +func (s *standaloneActivityTestSuite) TestActivityCancelled_DuplicateRequestIDSucceeds() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + taskQueue := s.tv.TaskQueue().String() + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + pollTaskResp := s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) + + for i := 0; i < 2; i++ { + _, err := s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: s.tv.ActivityID(), + RunId: runID, + Identity: "cancelling-worker", + RequestId: "cancel-request-id", + Reason: "Test Cancellation", + }) + require.NoError(t, err) + } + + // TODO: we should get the cancel request from heart beat once we implement it + + details := &commonpb.Payloads{ + Payloads: []*commonpb.Payload{ + payload.EncodeString("Canceled Details"), + }, + } + + _, err := s.FrontendClient().RespondActivityTaskCanceled(ctx, &workflowservice.RespondActivityTaskCanceledRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollTaskResp.TaskToken, + Details: details, + Identity: "new-worker", + }) + require.NoError(t, err) + + activityResp, err := s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + IncludeInfo: true, + IncludeInput: true, + IncludeOutcome: true, + }) + require.NoError(t, err) + + info := activityResp.GetInfo() + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, info.GetRunState()) + require.Equal(t, "Test Cancellation", info.GetCanceledReason()) + protorequire.ProtoEqual(t, details, activityResp.GetFailure().GetCanceledFailureInfo().GetDetails()) +} + +func (s *standaloneActivityTestSuite) TestActivityCancelled_DifferentRequestIDFails() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + taskQueue := s.tv.TaskQueue().String() + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) + + _, err := s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: s.tv.ActivityID(), + RunId: runID, + Identity: "cancelling-worker", + RequestId: "cancel-request-id", + Reason: "Test Cancellation", + }) + require.NoError(t, err) + + _, err = s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: s.tv.ActivityID(), + RunId: runID, + Identity: "cancelling-worker", + RequestId: "different-cancel-request-id", + Reason: "Test Cancellation", + }) + require.Error(t, err) +} + +func (s *standaloneActivityTestSuite) TestActivityFinishes_AfterCancelRequested() { + testCases := []struct { + name string + taskCompletionFn func(context.Context, *testing.T, []byte, string, string) error + expectedStatus enumspb.ActivityExecutionStatus + expectedState enumspb.PendingActivityState + }{ + { + name: "finish with completion", + taskCompletionFn: func(ctx context.Context, t *testing.T, taskToken []byte, activityID string, runID string) error { + _, err := s.FrontendClient().RespondActivityTaskCompleted(ctx, &workflowservice.RespondActivityTaskCompletedRequest{ + Namespace: s.Namespace().String(), + TaskToken: taskToken, + Result: defaultResult, + }) + + return err + }, + expectedStatus: enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, + expectedState: enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, + }, + { + name: "finish with failure", + taskCompletionFn: func(ctx context.Context, t *testing.T, taskToken []byte, activityID string, runID string) error { + _, err := s.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{ + Namespace: s.Namespace().String(), + TaskToken: taskToken, + Failure: defaultFailure, + }) + + return err + }, + expectedStatus: enumspb.ACTIVITY_EXECUTION_STATUS_FAILED, + expectedState: enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, + }, + { + name: "finish with termination", + taskCompletionFn: func(ctx context.Context, t *testing.T, taskToken []byte, activityID string, runID string) error { + _, err := s.FrontendClient().TerminateActivityExecution(ctx, &workflowservice.TerminateActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + Reason: "Test Termination", + }) + + return err + }, + expectedStatus: enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED, + expectedState: enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, + }, + } + + for _, tc := range testCases { + s.Run(tc.name, func() { + t := s.T() + + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + activityID := s.tv.ActivityID() + taskQueue := s.tv.TaskQueue().String() + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + pollTaskResp := s.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) + + _, err := s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: s.tv.ActivityID(), + RunId: runID, + Identity: "cancelling-worker", + RequestId: s.tv.RequestID(), + Reason: "Test Cancellation", + }) + require.NoError(t, err) + + err = tc.taskCompletionFn(ctx, t, pollTaskResp.GetTaskToken(), activityID, runID) + require.NoError(t, err) + + activityResp, err := s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + IncludeInfo: true, + }) + require.NoError(t, err) + + info := activityResp.GetInfo() + require.Equal(t, tc.expectedStatus, info.GetStatus()) + require.Equal(t, tc.expectedState, info.GetRunState()) + }) + } +} + func (s *standaloneActivityTestSuite) Test_PollActivityExecution_NoWait() { t := s.T() ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)