Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 50 additions & 12 deletions host/sizelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -153,19 +154,58 @@ func (s *sizeLimitIntegrationSuite) TestTerminateWorkflowCausedBySizeLimit() {
}

for i := int32(0); i < activityCount-1; i++ {
_, err := poller.PollAndProcessWorkflowTask(false, false)
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
dwResp, err := s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
})
s.NoError(err)

err = poller.PollAndProcessActivityTask(false)
s.Logger.Info("PollAndProcessActivityTask", tag.Error(err))
s.NoError(err)
// Poll workflow task only if it is running
if dwResp.WorkflowExecutionInfo.Status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
_, err := poller.PollAndProcessWorkflowTask(false, false)
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)

err = poller.PollAndProcessActivityTask(false)
s.Logger.Info("PollAndProcessActivityTask", tag.Error(err))
s.NoError(err)
}
}

// process this workflow task will trigger history exceed limit error
_, err := poller.PollAndProcessWorkflowTask(false, false)
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
var signalErr error
// Send signals until workflow is force terminated
SignalLoop:
for i := 0; i < 10; i++ {
// Send another signal without RunID
signalName := "another signal"
signalInput := payloads.EncodeString("another signal input")
_, signalErr = s.engine.SignalWorkflowExecution(NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
},
SignalName: signalName,
Input: signalInput,
Identity: identity,
})

if signalErr != nil {
break SignalLoop
}
}
// Signalling workflow should result in force terminating the workflow execution and returns with ResourceExhausted
// error. ResourceExhausted is retried by the client and eventually results in NotFound error returned back to the
// caller as workflow execution is no longer running.
s.EqualError(signalErr, "workflow execution already completed")
s.IsType(&serviceerror.NotFound{}, signalErr)

s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.GetRunId(),
})

// verify last event is terminated event
historyResponse, err := s.engine.GetWorkflowExecutionHistory(NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
Expand All @@ -178,9 +218,7 @@ func (s *sizeLimitIntegrationSuite) TestTerminateWorkflowCausedBySizeLimit() {
s.NoError(err)
history := historyResponse.History
lastEvent := history.Events[len(history.Events)-1]
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, lastEvent.GetEventType())
failedEventAttributes := lastEvent.GetWorkflowExecutionFailedEventAttributes()
s.True(failedEventAttributes.GetFailure().GetServerFailureInfo().GetNonRetryable())
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED, lastEvent.GetEventType())

// verify visibility is correctly processed from open to close
isCloseCorrect := false
Expand Down
4 changes: 4 additions & 0 deletions host/taskpoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ Loop:
Identity: p.Identity,
})

if common.IsServiceNonRetryableError(err1) {
return false, nil, err1
}

if err1 == history.ErrDuplicate {
p.Logger.Info("Duplicate Workflow task: Polling again")
continue Loop
Expand Down
37 changes: 0 additions & 37 deletions service/history/commandChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,43 +164,6 @@ func (c *workflowSizeChecker) failWorkflowIfPayloadSizeExceedsLimit(
return true, nil
}

func (c *workflowSizeChecker) failWorkflowSizeExceedsLimit() (bool, error) {
historyCount := int(c.mutableState.GetNextEventID()) - 1
historySize := int(c.executionStats.HistorySize)

if historySize > c.historySizeLimitError || historyCount > c.historyCountLimitError {
executionInfo := c.mutableState.GetExecutionInfo()
c.logger.Error("history size exceeds error limit.",
tag.WorkflowNamespaceID(executionInfo.NamespaceId),
tag.WorkflowID(executionInfo.WorkflowId),
tag.WorkflowRunID(executionInfo.ExecutionState.RunId),
tag.WorkflowHistorySize(historySize),
tag.WorkflowEventCount(historyCount))

attributes := &commandpb.FailWorkflowExecutionCommandAttributes{
Failure: failure.NewServerFailure(common.FailureReasonSizeExceedsLimit, true),
}

if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE, attributes); err != nil {
return false, err
}
return true, nil
}

if historySize > c.historySizeLimitWarn || historyCount > c.historyCountLimitWarn {
executionInfo := c.mutableState.GetExecutionInfo()
c.logger.Warn("history size exceeds warn limit.",
tag.WorkflowNamespaceID(executionInfo.NamespaceId),
tag.WorkflowID(executionInfo.WorkflowId),
tag.WorkflowRunID(executionInfo.ExecutionState.RunId),
tag.WorkflowHistorySize(historySize),
tag.WorkflowEventCount(historyCount))
return false, nil
}

return false, nil
}

func (v *commandAttrValidator) validateActivityScheduleAttributes(
namespaceID string,
targetNamespaceID string,
Expand Down
2 changes: 2 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ var (
ErrConsistentQueryBufferExceeded = serviceerror.NewInternal("consistent query buffer is full, cannot accept new consistent queries")
// ErrEmptyHistoryRawEventBatch indicate that one single batch of history raw events is of size 0
ErrEmptyHistoryRawEventBatch = serviceerror.NewInvalidArgument("encounter empty history batch")
// ErrSizeExceedsLimit is error indicating workflow execution has exceeded system defined limit
ErrSizeExceedsLimit = serviceerror.NewResourceExhausted(common.FailureReasonSizeExceedsLimit)

// FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy
// for start workflow execution API
Expand Down
82 changes: 80 additions & 2 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type (
logger log.Logger
metricsClient metrics.Client
timeSource clock.TimeSource
config *Config

mutex locks.Mutex
mutableState mutableState
Expand Down Expand Up @@ -168,6 +169,7 @@ func newWorkflowExecutionContext(
logger: logger,
metricsClient: shard.GetMetricsClient(),
timeSource: shard.GetTimeSource(),
config: shard.GetConfig(),
mutex: locks.NewMutex(),
stats: &persistenceblobs.ExecutionStats{
HistorySize: 0,
Expand Down Expand Up @@ -572,14 +574,31 @@ func (c *workflowExecutionContextImpl) updateWorkflowExecutionAsActive(
now time.Time,
) error {

return c.updateWorkflowExecutionWithNew(
// We only perform this check on active cluster for the namespace
forceTerminate, err := c.enforceSizeCheck()
if err != nil {
return err
}

if err := c.updateWorkflowExecutionWithNew(
now,
persistence.UpdateWorkflowModeUpdateCurrent,
nil,
nil,
transactionPolicyActive,
nil,
)
); err != nil {
return err
}

if forceTerminate {
// Returns ResourceExhausted error back to caller after workflow execution is forced terminated
// Retrying the operation will give appropriate semantics operation should expect in the case of workflow
// execution being closed.
return ErrSizeExceedsLimit
}

return nil
}

func (c *workflowExecutionContextImpl) updateWorkflowExecutionWithNewAsActive(
Expand Down Expand Up @@ -1146,3 +1165,62 @@ func (c *workflowExecutionContextImpl) reapplyEvents(

return err
}

// Returns true if execution is forced terminated
func (c *workflowExecutionContextImpl) enforceSizeCheck() (bool, error) {
historySizeLimitWarn := c.config.HistorySizeLimitWarn(c.getNamespace())
historySizeLimitError := c.config.HistorySizeLimitError(c.getNamespace())
historyCountLimitWarn := c.config.HistoryCountLimitWarn(c.getNamespace())
historyCountLimitError := c.config.HistoryCountLimitError(c.getNamespace())

historySize := int(c.stats.HistorySize)
historyCount := int(c.mutableState.GetNextEventID() - 1)

// Hard terminate workflow if still running and breached size or count limit
if (historySize > historySizeLimitError || historyCount > historyCountLimitError) &&
c.mutableState.IsWorkflowExecutionRunning() {
c.logger.Error("history size exceeds error limit.",
tag.WorkflowNamespaceID(c.namespaceID),
tag.WorkflowID(c.workflowExecution.GetWorkflowId()),
tag.WorkflowRunID(c.workflowExecution.GetRunId()),
tag.WorkflowHistorySize(historySize),
tag.WorkflowEventCount(historyCount))

// Discard pending changes in mutableState so we can apply terminate state transition
c.clear()

// Reload mutable state
mutableState, err := c.loadWorkflowExecution()
if err != nil {
return false, err
}

// Terminate workflow is written as a separate batch and might result in more than one event as we close the
// outstanding workflow task before terminating the workflow
eventBatchFirstEventID := mutableState.GetNextEventID()
if err := terminateWorkflow(
mutableState,
eventBatchFirstEventID,
common.FailureReasonSizeExceedsLimit,
nil,
identityHistoryService,
); err != nil {
return false, err
}

// Return true to caller to indicate workflow state is overwritten to force terminate execution on update
return true, nil
}

if historySize > historySizeLimitWarn || historyCount > historyCountLimitWarn {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also do the following:

Separate, independent logs for size vs count
In the log, can we also trace out the warn threshold? That just makes it a bit more useful from a debuggability point of view.

We should do the same thing for error threshold above as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, we will trace here if the workflow execution is not running. Is that the intent? or should we exit this entire function early if the workflow is not running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is intentional to warn even for closed execution. This is to let users know that they are running close to size limit.
I kind of disagree on having separate logs for size and count. We include both as tags so it's easy to filter on either.

executionInfo := c.mutableState.GetExecutionInfo()
c.logger.Warn("history size exceeds warn limit.",
tag.WorkflowNamespaceID(executionInfo.NamespaceId),
tag.WorkflowID(executionInfo.WorkflowId),
tag.WorkflowRunID(executionInfo.ExecutionState.RunId),
tag.WorkflowHistorySize(historySize),
tag.WorkflowEventCount(historyCount))
}

return false, nil
}
8 changes: 1 addition & 7 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,9 @@ func (handler *workflowTaskHandlerImpl) handleCommands(
commands []*commandpb.Command,
) error {

// overall workflow size / count check
failWorkflow, err := handler.sizeLimitChecker.failWorkflowSizeExceedsLimit()
if err != nil || failWorkflow {
return err
}

for _, command := range commands {

err = handler.handleCommand(command)
err := handler.handleCommand(command)
if err != nil || handler.stopProcessing {
return err
}
Expand Down