diff --git a/host/sizelimit_test.go b/host/sizelimit_test.go index 3d3fb19016e..53a8ef848a2 100644 --- a/host/sizelimit_test.go +++ b/host/sizelimit_test.go @@ -36,6 +36,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" @@ -153,19 +154,58 @@ func (s *sizeLimitIntegrationSuite) TestTerminateWorkflowCausedBySizeLimit() { } for i := int32(0); i < activityCount-1; i++ { - _, err := poller.PollAndProcessWorkflowTask(false, false) - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + dwResp, err := s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.namespace, + Execution: &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: we.RunId, + }, + }) s.NoError(err) - err = poller.PollAndProcessActivityTask(false) - s.Logger.Info("PollAndProcessActivityTask", tag.Error(err)) - s.NoError(err) + // Poll workflow task only if it is running + if dwResp.WorkflowExecutionInfo.Status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { + _, err := poller.PollAndProcessWorkflowTask(false, false) + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.NoError(err) + + err = poller.PollAndProcessActivityTask(false) + s.Logger.Info("PollAndProcessActivityTask", tag.Error(err)) + s.NoError(err) + } } - // process this workflow task will trigger history exceed limit error - _, err := poller.PollAndProcessWorkflowTask(false, false) - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) - s.NoError(err) + var signalErr error + // Send signals until workflow is force terminated +SignalLoop: + for i := 0; i < 10; i++ { + // Send another signal without RunID + signalName := "another signal" + signalInput := payloads.EncodeString("another signal input") + _, signalErr = s.engine.SignalWorkflowExecution(NewContext(), &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: s.namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: id, + }, + SignalName: signalName, + Input: signalInput, + Identity: identity, + }) + + if signalErr != nil { + break SignalLoop + } + } + // Signalling workflow should result in force terminating the workflow execution and returns with ResourceExhausted + // error. ResourceExhausted is retried by the client and eventually results in NotFound error returned back to the + // caller as workflow execution is no longer running. + s.EqualError(signalErr, "workflow execution already completed") + s.IsType(&serviceerror.NotFound{}, signalErr) + + s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: we.GetRunId(), + }) // verify last event is terminated event historyResponse, err := s.engine.GetWorkflowExecutionHistory(NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{ @@ -178,9 +218,7 @@ func (s *sizeLimitIntegrationSuite) TestTerminateWorkflowCausedBySizeLimit() { s.NoError(err) history := historyResponse.History lastEvent := history.Events[len(history.Events)-1] - s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, lastEvent.GetEventType()) - failedEventAttributes := lastEvent.GetWorkflowExecutionFailedEventAttributes() - s.True(failedEventAttributes.GetFailure().GetServerFailureInfo().GetNonRetryable()) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED, lastEvent.GetEventType()) // verify visibility is correctly processed from open to close isCloseCorrect := false diff --git a/host/taskpoller.go b/host/taskpoller.go index c229e75f030..e113a22e7ae 100644 --- a/host/taskpoller.go +++ b/host/taskpoller.go @@ -152,6 +152,10 @@ Loop: Identity: p.Identity, }) + if common.IsServiceNonRetryableError(err1) { + return false, nil, err1 + } + if err1 == history.ErrDuplicate { p.Logger.Info("Duplicate Workflow task: Polling again") continue Loop diff --git a/service/history/commandChecker.go b/service/history/commandChecker.go index df131112375..5f80d321733 100644 --- a/service/history/commandChecker.go +++ b/service/history/commandChecker.go @@ -164,43 +164,6 @@ func (c *workflowSizeChecker) failWorkflowIfPayloadSizeExceedsLimit( return true, nil } -func (c *workflowSizeChecker) failWorkflowSizeExceedsLimit() (bool, error) { - historyCount := int(c.mutableState.GetNextEventID()) - 1 - historySize := int(c.executionStats.HistorySize) - - if historySize > c.historySizeLimitError || historyCount > c.historyCountLimitError { - executionInfo := c.mutableState.GetExecutionInfo() - c.logger.Error("history size exceeds error limit.", - tag.WorkflowNamespaceID(executionInfo.NamespaceId), - tag.WorkflowID(executionInfo.WorkflowId), - tag.WorkflowRunID(executionInfo.ExecutionState.RunId), - tag.WorkflowHistorySize(historySize), - tag.WorkflowEventCount(historyCount)) - - attributes := &commandpb.FailWorkflowExecutionCommandAttributes{ - Failure: failure.NewServerFailure(common.FailureReasonSizeExceedsLimit, true), - } - - if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE, attributes); err != nil { - return false, err - } - return true, nil - } - - if historySize > c.historySizeLimitWarn || historyCount > c.historyCountLimitWarn { - executionInfo := c.mutableState.GetExecutionInfo() - c.logger.Warn("history size exceeds warn limit.", - tag.WorkflowNamespaceID(executionInfo.NamespaceId), - tag.WorkflowID(executionInfo.WorkflowId), - tag.WorkflowRunID(executionInfo.ExecutionState.RunId), - tag.WorkflowHistorySize(historySize), - tag.WorkflowEventCount(historyCount)) - return false, nil - } - - return false, nil -} - func (v *commandAttrValidator) validateActivityScheduleAttributes( namespaceID string, targetNamespaceID string, diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index c2337ceb1f4..bbd81375cbf 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -196,6 +196,8 @@ var ( ErrConsistentQueryBufferExceeded = serviceerror.NewInternal("consistent query buffer is full, cannot accept new consistent queries") // ErrEmptyHistoryRawEventBatch indicate that one single batch of history raw events is of size 0 ErrEmptyHistoryRawEventBatch = serviceerror.NewInvalidArgument("encounter empty history batch") + // ErrSizeExceedsLimit is error indicating workflow execution has exceeded system defined limit + ErrSizeExceedsLimit = serviceerror.NewResourceExhausted(common.FailureReasonSizeExceedsLimit) // FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy // for start workflow execution API diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index 0d41af9e2c0..7db73ac21af 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -138,6 +138,7 @@ type ( logger log.Logger metricsClient metrics.Client timeSource clock.TimeSource + config *Config mutex locks.Mutex mutableState mutableState @@ -168,6 +169,7 @@ func newWorkflowExecutionContext( logger: logger, metricsClient: shard.GetMetricsClient(), timeSource: shard.GetTimeSource(), + config: shard.GetConfig(), mutex: locks.NewMutex(), stats: &persistenceblobs.ExecutionStats{ HistorySize: 0, @@ -572,14 +574,31 @@ func (c *workflowExecutionContextImpl) updateWorkflowExecutionAsActive( now time.Time, ) error { - return c.updateWorkflowExecutionWithNew( + // We only perform this check on active cluster for the namespace + forceTerminate, err := c.enforceSizeCheck() + if err != nil { + return err + } + + if err := c.updateWorkflowExecutionWithNew( now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, transactionPolicyActive, nil, - ) + ); err != nil { + return err + } + + if forceTerminate { + // Returns ResourceExhausted error back to caller after workflow execution is forced terminated + // Retrying the operation will give appropriate semantics operation should expect in the case of workflow + // execution being closed. + return ErrSizeExceedsLimit + } + + return nil } func (c *workflowExecutionContextImpl) updateWorkflowExecutionWithNewAsActive( @@ -1146,3 +1165,62 @@ func (c *workflowExecutionContextImpl) reapplyEvents( return err } + +// Returns true if execution is forced terminated +func (c *workflowExecutionContextImpl) enforceSizeCheck() (bool, error) { + historySizeLimitWarn := c.config.HistorySizeLimitWarn(c.getNamespace()) + historySizeLimitError := c.config.HistorySizeLimitError(c.getNamespace()) + historyCountLimitWarn := c.config.HistoryCountLimitWarn(c.getNamespace()) + historyCountLimitError := c.config.HistoryCountLimitError(c.getNamespace()) + + historySize := int(c.stats.HistorySize) + historyCount := int(c.mutableState.GetNextEventID() - 1) + + // Hard terminate workflow if still running and breached size or count limit + if (historySize > historySizeLimitError || historyCount > historyCountLimitError) && + c.mutableState.IsWorkflowExecutionRunning() { + c.logger.Error("history size exceeds error limit.", + tag.WorkflowNamespaceID(c.namespaceID), + tag.WorkflowID(c.workflowExecution.GetWorkflowId()), + tag.WorkflowRunID(c.workflowExecution.GetRunId()), + tag.WorkflowHistorySize(historySize), + tag.WorkflowEventCount(historyCount)) + + // Discard pending changes in mutableState so we can apply terminate state transition + c.clear() + + // Reload mutable state + mutableState, err := c.loadWorkflowExecution() + if err != nil { + return false, err + } + + // Terminate workflow is written as a separate batch and might result in more than one event as we close the + // outstanding workflow task before terminating the workflow + eventBatchFirstEventID := mutableState.GetNextEventID() + if err := terminateWorkflow( + mutableState, + eventBatchFirstEventID, + common.FailureReasonSizeExceedsLimit, + nil, + identityHistoryService, + ); err != nil { + return false, err + } + + // Return true to caller to indicate workflow state is overwritten to force terminate execution on update + return true, nil + } + + if historySize > historySizeLimitWarn || historyCount > historyCountLimitWarn { + executionInfo := c.mutableState.GetExecutionInfo() + c.logger.Warn("history size exceeds warn limit.", + tag.WorkflowNamespaceID(executionInfo.NamespaceId), + tag.WorkflowID(executionInfo.WorkflowId), + tag.WorkflowRunID(executionInfo.ExecutionState.RunId), + tag.WorkflowHistorySize(historySize), + tag.WorkflowEventCount(historyCount)) + } + + return false, nil +} diff --git a/service/history/workflowTaskHandler.go b/service/history/workflowTaskHandler.go index 77efe801196..2d581cfc685 100644 --- a/service/history/workflowTaskHandler.go +++ b/service/history/workflowTaskHandler.go @@ -124,15 +124,9 @@ func (handler *workflowTaskHandlerImpl) handleCommands( commands []*commandpb.Command, ) error { - // overall workflow size / count check - failWorkflow, err := handler.sizeLimitChecker.failWorkflowSizeExceedsLimit() - if err != nil || failWorkflow { - return err - } - for _, command := range commands { - err = handler.handleCommand(command) + err := handler.handleCommand(command) if err != nil || handler.stopProcessing { return err }