Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions host/cancelworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ package host
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -534,3 +536,172 @@ CheckHistoryLoopForCancelSent:

s.True(cancellationSentFailed)
}

func (s *integrationSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {
id := "integration-immediate-child-cancellation-worflow-task-failed-test"
wt := "integration-immediate-child-cancellation-worflow-task-failed-test-type"
tl := "integration-immediate-child-cancellation-worflow-task-failed-test-taskqueue"
childWorkflowID := "integration-immediate-child-cancellation-worflow-task-failed-child-test"
childTaskQueue := "integration-immediate-child-cancellation-worflow-task-failed-child-test-taskqueue"
identity := "worker1"

workflowType := &commonpb.WorkflowType{Name: wt}

taskQueue := &taskqueuepb.TaskQueue{Name: tl}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Input: nil,
WorkflowRunTimeoutSeconds: 100,
WorkflowTaskTimeoutSeconds: 1,
Identity: identity,
}
we, err0 := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err0)
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

s.engine.RequestCancelWorkflowExecution(NewContext(), &workflowservice.RequestCancelWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
Identity: identity,
RequestId: uuid.New(),
})

childCancelled := false
var initiatedEvent *historypb.HistoryEvent
var requestCancelEvent *historypb.HistoryEvent
var workflowtaskFailedEvent *historypb.HistoryEvent
workflowComplete := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
if !childCancelled {
startEvent := history.Events[0]
if startEvent.EventType != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {
return nil, errors.New("first event is not workflow execution started")
}

workflowTaskScheduledEvent := history.Events[1]
if workflowTaskScheduledEvent.EventType != enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
return nil, errors.New("second event is not workflow task scheduled")
}

cancelRequestedEvent := history.Events[2]
if cancelRequestedEvent.EventType != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED {
return nil, errors.New("third event is not cancel requested")
}

// Schedule and cancel child workflow in the same decision
childCancelled = true
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, 1)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{
Namespace: s.namespace,
WorkflowId: childWorkflowID,
WorkflowType: &commonpb.WorkflowType{Name: "childTypeA"},
TaskQueue: &taskqueuepb.TaskQueue{Name: childTaskQueue},
Input: payloads.EncodeBytes(buf.Bytes()),
}},
}, {
CommandType: enumspb.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_RequestCancelExternalWorkflowExecutionCommandAttributes{RequestCancelExternalWorkflowExecutionCommandAttributes: &commandpb.RequestCancelExternalWorkflowExecutionCommandAttributes{
Namespace: s.namespace,
WorkflowId: childWorkflowID,
ChildWorkflowOnly: true,
}},
}}, nil
}

if previousStartedEventID != 0 {
return nil, errors.New("previous started decision moved unexpectedly after first failed workflow task")
}
// Validate child workflow as cancelled
for _, event := range history.Events[previousStartedEventID:] {
s.Logger.Info(fmt.Sprintf("Processing EventID: %v, Event: %v", event.GetEventId(), event))
switch event.GetEventType() {
case enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
initiatedEvent = event
case enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
requestCancelEvent = event
case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
workflowtaskFailedEvent = event
}
}

if initiatedEvent != nil {
return nil, errors.New("start child workflow command accepted from previous workflow task")
}

if requestCancelEvent != nil {
return nil, errors.New("request cancel command accepted from previous workflow task")
}

if workflowtaskFailedEvent == nil {
return nil, errors.New("workflow task failed event not found due to previous bad commands")
}

taskFailure := workflowtaskFailedEvent.GetWorkflowTaskFailedEventAttributes().Failure
if taskFailure.GetMessage() != "Start and RequestCancel for child workflow is not allowed in same workflow task." {
return nil, errors.New("Unexpected workflow task failure")
}

workflowComplete = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}}, nil
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.Logger,
T: s.T(),
}

s.Logger.Info("Process first workflow task which starts and request cancels child workflow")
_, err := poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)

s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: id,
})

s.Logger.Info("Process second workflow task which observes child workflow is cancelled and completes")
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)

s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: id,
})

_, err = s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
},
})
if err == nil {
s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: childWorkflowID,
})
}
s.Logger.Error("Describe error", tag.Error(err))
s.NotNil(err, "Child workflow execution started instead of getting cancelled")
s.IsType(&serviceerror.NotFound{}, err, "Error is not of type 'NotFound'")

s.True(workflowComplete)
}
4 changes: 4 additions & 0 deletions service/history/commandChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func (v *commandAttrValidator) validateCancelWorkflowExecutionAttributes(
func (v *commandAttrValidator) validateCancelExternalWorkflowExecutionAttributes(
namespaceID string,
targetNamespaceID string,
initiatedChildExecutionsInSession map[string]struct{},
attributes *commandpb.RequestCancelExternalWorkflowExecutionCommandAttributes,
) error {

Expand All @@ -425,6 +426,9 @@ func (v *commandAttrValidator) validateCancelExternalWorkflowExecutionAttributes
if runID != "" && uuid.Parse(runID) == nil {
return serviceerror.NewInvalidArgument("Invalid RunId set on command.")
}
if _, ok := initiatedChildExecutionsInSession[attributes.GetWorkflowId()]; ok {
return serviceerror.NewInvalidArgument("Start and RequestCancel for child workflow is not allowed in same workflow task.")
}

return nil
}
Expand Down
32 changes: 20 additions & 12 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ type (
namespaceEntry *cache.NamespaceCacheEntry

// internal state
hasBufferedEvents bool
failWorkflowTaskInfo *failWorkflowTaskInfo
activityNotStartedCancelled bool
continueAsNewBuilder mutableState
stopProcessing bool // should stop processing any more commands
mutableState mutableState
hasBufferedEvents bool
failWorkflowTaskInfo *failWorkflowTaskInfo
activityNotStartedCancelled bool
continueAsNewBuilder mutableState
stopProcessing bool // should stop processing any more commands
mutableState mutableState
initiatedChildExecutionsInBatch map[string]struct{} // Set of initiated child executions in the workflow task

// validation
attrValidator *commandAttrValidator
Expand Down Expand Up @@ -96,12 +97,13 @@ func newWorkflowTaskHandler(
namespaceEntry: namespaceEntry,

// internal state
hasBufferedEvents: mutableState.HasBufferedEvents(),
failWorkflowTaskInfo: nil,
activityNotStartedCancelled: false,
continueAsNewBuilder: nil,
stopProcessing: false,
mutableState: mutableState,
hasBufferedEvents: mutableState.HasBufferedEvents(),
failWorkflowTaskInfo: nil,
activityNotStartedCancelled: false,
continueAsNewBuilder: nil,
stopProcessing: false,
mutableState: mutableState,
initiatedChildExecutionsInBatch: make(map[string]struct{}),

// validation
attrValidator: attrValidator,
Expand Down Expand Up @@ -582,6 +584,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelExternalWorkfl
return handler.attrValidator.validateCancelExternalWorkflowExecutionAttributes(
namespaceID,
targetNamespaceID,
handler.initiatedChildExecutionsInBatch,
attr,
)
},
Expand All @@ -594,6 +597,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelExternalWorkfl
_, _, err := handler.mutableState.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(
handler.workflowTaskCompletedID, cancelRequestID, attr,
)

return err
}

Expand Down Expand Up @@ -776,6 +780,10 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartChildWorkflow(
_, _, err = handler.mutableState.AddStartChildWorkflowExecutionInitiatedEvent(
handler.workflowTaskCompletedID, requestID, attr,
)
if err == nil {
// Keep track of all child initiated commands in this workflow task to validate request cancel commands
handler.initiatedChildExecutionsInBatch[attr.GetWorkflowId()] = struct{}{}
}
return err
}

Expand Down