Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions host/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ func (s *integrationSuite) TestActivityHeartbeatTimeouts() {
}
}

func (s *integrationSuite) TestActivityCancellation() {
func (s *integrationSuite) TestTryActivityCancellationFromWorkflow() {
id := "integration-activity-cancellation-test"
wt := "integration-activity-cancellation-test-type"
tl := "integration-activity-cancellation-test-taskqueue"
Expand Down Expand Up @@ -1175,7 +1175,7 @@ func (s *integrationSuite) TestActivityCancellation() {
}}, nil
}

activityExecutedCount := 0
activityCanceled := false
atHandler := func(execution *commonpb.WorkflowExecution, activityType *commonpb.ActivityType,
activityID string, input *commonpb.Payloads, taskToken []byte) (*commonpb.Payloads, bool, error) {
s.Equal(id, execution.GetWorkflowId())
Expand All @@ -1186,12 +1186,12 @@ func (s *integrationSuite) TestActivityCancellation() {
&workflowservice.RecordActivityTaskHeartbeatRequest{
TaskToken: taskToken, Details: payloads.EncodeString("details")})
if response != nil && response.CancelRequested {
activityCanceled = true
return payloads.EncodeString("Activity Cancelled"), true, nil
}
s.NoError(err)
time.Sleep(10 * time.Millisecond)
}
activityExecutedCount++
return payloads.EncodeString("Activity Result"), false, nil
}

Expand All @@ -1210,21 +1210,36 @@ func (s *integrationSuite) TestActivityCancellation() {
s.True(err == nil || err == matching.ErrNoTasks, err)

cancelCh := make(chan struct{})

go func() {
s.Logger.Info("Trying to cancel the task in a different thread")
// Send signal so that worker can send an activity cancel
_, err1 := s.engine.SignalWorkflowExecution(NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
SignalName: "my signal",
Input: nil,
Identity: identity,
})
s.NoError(err1)

scheduleActivity = false
requestCancellation = true
_, err := poller.PollAndProcessWorkflowTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks, err)
cancelCh <- struct{}{}
_, err2 := poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err2)
close(cancelCh)
}()

s.Logger.Info("Start activity.")
err = poller.PollAndProcessActivityTask(false)
s.True(err == nil || err == matching.ErrNoTasks, err)

s.Logger.Info("Waiting for cancel to complete.", tag.WorkflowRunID(we.RunId))
<-cancelCh
s.Logger.Info("Waiting for workflow to complete", tag.WorkflowRunID(we.RunId))
s.True(activityCanceled, "Activity was not cancelled.")
s.Logger.Info("Activity cancelled.", tag.WorkflowRunID(we.RunId))
}

func (s *integrationSuite) TestActivityCancellationNotStarted() {
Expand Down
14 changes: 7 additions & 7 deletions host/cancelworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,16 +600,14 @@ func (s *integrationSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {

// Schedule and cancel child workflow in the same decision
childCancelled = true
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, 1)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{
Namespace: s.namespace,
WorkflowId: childWorkflowID,
WorkflowType: &commonpb.WorkflowType{Name: "childTypeA"},
TaskQueue: &taskqueuepb.TaskQueue{Name: childTaskQueue},
Input: payloads.EncodeBytes(buf.Bytes()),
Input: payloads.EncodeBytes([]byte{1}),
}},
}, {
CommandType: enumspb.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION,
Expand Down Expand Up @@ -649,9 +647,9 @@ func (s *integrationSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {
return nil, errors.New("workflow task failed event not found due to previous bad commands")
}

taskFailure := workflowtaskFailedEvent.GetWorkflowTaskFailedEventAttributes().Failure
if taskFailure.GetMessage() != "Start and RequestCancel for child workflow is not allowed in same workflow task." {
return nil, errors.New("Unexpected workflow task failure")
taskFailure := workflowtaskFailedEvent.GetWorkflowTaskFailedEventAttributes().GetFailure()
if taskFailure.GetMessage() != "BadRequestCancelExternalWorkflowExecutionAttributes: Start and RequestCancel for child workflow is not allowed in same workflow task." {
return nil, errors.New("unexpected workflow task failure")
}

workflowComplete = true
Expand All @@ -675,7 +673,9 @@ func (s *integrationSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {

s.Logger.Info("Process first workflow task which starts and request cancels child workflow")
_, err := poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
s.Error(err)
s.IsType(&serviceerror.InvalidArgument{}, err)
s.Equal("BadRequestCancelExternalWorkflowExecutionAttributes: Start and RequestCancel for child workflow is not allowed in same workflow task.", err.Error())

s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: id,
Expand Down
5 changes: 4 additions & 1 deletion host/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -1090,7 +1091,9 @@ func (s *elasticsearchIntegrationSuite) TestUpsertWorkflowExecution_InvalidKey()
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)
s.Error(err)
s.IsType(&serviceerror.InvalidArgument{}, err)
s.Equal("BadSearchAttributes: INVALIDKEY is not valid search attribute key", err.Error())

historyResponse, err := s.engine.GetWorkflowExecutionHistory(NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: s.namespace,
Expand Down
73 changes: 71 additions & 2 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ func (s *integrationSuite) TestCronWorkflowTimeout() {
executions = append(executions, execution)
return []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
CommandType: enumspb.COMMAND_TYPE_START_TIMER,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is 🤦. With new handler logic, I was able to catch it.


Attributes: &commandpb.Command_StartTimerCommandAttributes{StartTimerCommandAttributes: &commandpb.StartTimerCommandAttributes{
TimerId: "timer-id",
Expand Down Expand Up @@ -2653,7 +2653,9 @@ func (s *integrationSuite) TestNoTransientWorkflowTaskAfterFlushBufferedEvents()
// so it will fail and create a new workflow task
_, err := poller.PollAndProcessWorkflowTask(true, false)
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
s.Error(err)
s.IsType(&serviceerror.InvalidArgument{}, err)
s.Equal("UnhandledCommand", err.Error())

// second workflow task, which will complete the workflow
// this expect the workflow task to have attempt == 1
Expand Down Expand Up @@ -3694,6 +3696,73 @@ func (s *integrationSuite) TestCancelTimer_CancelFiredAndBuffered() {
}
}

func (s *integrationSuite) TestRespondWorkflowTaskCompleted_ReturnsErrorIfInvalidArgument() {
id := "integration-respond-workflow-task-completed-test"
wt := "integration-respond-workflow-task-completed-test-type"
tq := "integration-respond-workflow-task-completed-test-taskqueue"
identity := "worker1"

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
Identity: identity,
}

we0, err0 := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err0)
s.NotNil(we0)

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {

return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_RECORD_MARKER,
Attributes: &commandpb.Command_RecordMarkerCommandAttributes{
RecordMarkerCommandAttributes: &commandpb.RecordMarkerCommandAttributes{
MarkerName: "", // Marker name is missing.
Details: nil,
Header: nil,
Failure: nil,
}},
}}, nil
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: nil,
Logger: s.Logger,
T: s.T(),
}

_, err := poller.PollAndProcessWorkflowTask(false, false)
s.Error(err)
s.IsType(&serviceerror.InvalidArgument{}, err)
s.Equal("BadRecordMarkerAttributes: MarkerName is not set on command.", err.Error())

resp, err := s.engine.GetWorkflowExecutionHistory(NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we0.GetRunId(),
},
})

s.NoError(err)
s.NotNil(resp)

// Last event is WORKFLOW_TASK_FAILED.
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED, resp.History.Events[len(resp.History.Events)-1].GetEventType())
}

// helper function for TestStartWithMemo and TestSignalWithStartWithMemo to reduce duplicate code
func (s *integrationSuite) startWithMemoHelper(startFn startFunc, id string, taskQueue *taskqueuepb.TaskQueue, memo *commonpb.Memo) {
identity := "worker1"
Expand Down
16 changes: 11 additions & 5 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"
querypb "go.temporal.io/api/query/v1"
"go.temporal.io/api/serviceerror"
Expand All @@ -58,6 +57,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/failure"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -2546,8 +2546,7 @@ func (e *historyEngineImpl) failWorkflowTask(
context workflowExecutionContext,
scheduleID int64,
startedID int64,
cause enumspb.WorkflowTaskFailedCause,
failure *failurepb.Failure,
workflowTaskFailedErr *workflowTaskFailedError,
request *workflowservice.RespondWorkflowTaskCompletedRequest,
) (mutableState, error) {

Expand All @@ -2561,8 +2560,15 @@ func (e *historyEngineImpl) failWorkflowTask(
}

if _, err = mutableState.AddWorkflowTaskFailedEvent(
scheduleID, startedID, cause, failure, request.GetIdentity(), request.GetBinaryChecksum(), "", "", 0,
); err != nil {
scheduleID,
startedID,
workflowTaskFailedErr.failedCause,
failure.NewServerFailure(workflowTaskFailedErr.Error(), true),
request.GetIdentity(),
request.GetBinaryChecksum(),
"",
"",
0); err != nil {
return nil, err
}

Expand Down
Loading