Skip to content

Commit 2f7491e

Browse files
committed
Force terminate workflow execution when exceeds system limit
Moved the logic to force terminate workflow execution to workflowExecutionContext to make sure it is triggered for all cases when workflow execution exceeds size limit. Previously we are only performing this check on workflow task completion. This could result in scenarios where workflow would never be forced terminated. Removed the logic to fail the workflow on command processing as it is redundant after moving force termination logic to more generic location.
1 parent 8d90c48 commit 2f7491e

File tree

6 files changed

+134
-58
lines changed

6 files changed

+134
-58
lines changed

host/sizelimit_test.go

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/stretchr/testify/require"
3737
"github.com/stretchr/testify/suite"
3838
enumspb "go.temporal.io/api/enums/v1"
39+
"go.temporal.io/api/serviceerror"
3940

4041
commandpb "go.temporal.io/api/command/v1"
4142
commonpb "go.temporal.io/api/common/v1"
@@ -44,6 +45,7 @@ import (
4445
taskqueuepb "go.temporal.io/api/taskqueue/v1"
4546
"go.temporal.io/api/workflowservice/v1"
4647

48+
"go.temporal.io/server/common"
4749
"go.temporal.io/server/common/log/tag"
4850
"go.temporal.io/server/common/payloads"
4951
"go.temporal.io/server/common/primitives/timestamp"
@@ -153,19 +155,55 @@ func (s *sizeLimitIntegrationSuite) TestTerminateWorkflowCausedBySizeLimit() {
153155
}
154156

155157
for i := int32(0); i < activityCount-1; i++ {
156-
_, err := poller.PollAndProcessWorkflowTask(false, false)
157-
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
158+
dwResp, err := s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
159+
Namespace: s.namespace,
160+
Execution: &commonpb.WorkflowExecution{
161+
WorkflowId: id,
162+
RunId: we.RunId,
163+
},
164+
})
158165
s.NoError(err)
159166

160-
err = poller.PollAndProcessActivityTask(false)
161-
s.Logger.Info("PollAndProcessActivityTask", tag.Error(err))
162-
s.NoError(err)
167+
// Poll workflow task only if it is running
168+
if dwResp.WorkflowExecutionInfo.Status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
169+
_, err := poller.PollAndProcessWorkflowTask(false, false)
170+
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
171+
s.NoError(err)
172+
173+
err = poller.PollAndProcessActivityTask(false)
174+
s.Logger.Info("PollAndProcessActivityTask", tag.Error(err))
175+
s.NoError(err)
176+
}
163177
}
164178

165-
// process this workflow task will trigger history exceed limit error
166-
_, err := poller.PollAndProcessWorkflowTask(false, false)
167-
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
168-
s.NoError(err)
179+
var signalErr error
180+
// Send signals until workflow is force terminated
181+
SignalLoop:
182+
for i := 0; i < 10; i++ {
183+
// Send another signal without RunID
184+
signalName := "another signal"
185+
signalInput := payloads.EncodeString("another signal input")
186+
_, signalErr = s.engine.SignalWorkflowExecution(NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
187+
Namespace: s.namespace,
188+
WorkflowExecution: &commonpb.WorkflowExecution{
189+
WorkflowId: id,
190+
},
191+
SignalName: signalName,
192+
Input: signalInput,
193+
Identity: identity,
194+
})
195+
196+
if signalErr != nil {
197+
break SignalLoop
198+
}
199+
}
200+
s.EqualError(signalErr, common.FailureReasonSizeExceedsLimit)
201+
s.IsType(&serviceerror.InvalidArgument{}, signalErr)
202+
203+
s.printWorkflowHistory(s.namespace, &commonpb.WorkflowExecution{
204+
WorkflowId: id,
205+
RunId: we.GetRunId(),
206+
})
169207

170208
// verify last event is terminated event
171209
historyResponse, err := s.engine.GetWorkflowExecutionHistory(NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
@@ -178,9 +216,7 @@ func (s *sizeLimitIntegrationSuite) TestTerminateWorkflowCausedBySizeLimit() {
178216
s.NoError(err)
179217
history := historyResponse.History
180218
lastEvent := history.Events[len(history.Events)-1]
181-
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, lastEvent.GetEventType())
182-
failedEventAttributes := lastEvent.GetWorkflowExecutionFailedEventAttributes()
183-
s.True(failedEventAttributes.GetFailure().GetServerFailureInfo().GetNonRetryable())
219+
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED, lastEvent.GetEventType())
184220

185221
// verify visibility is correctly processed from open to close
186222
isCloseCorrect := false

host/taskpoller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ Loop:
152152
Identity: p.Identity,
153153
})
154154

155+
if common.IsServiceNonRetryableError(err1) {
156+
return false, nil, err1
157+
}
158+
155159
if err1 == history.ErrDuplicate {
156160
p.Logger.Info("Duplicate Workflow task: Polling again")
157161
continue Loop

service/history/commandChecker.go

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -164,43 +164,6 @@ func (c *workflowSizeChecker) failWorkflowIfPayloadSizeExceedsLimit(
164164
return true, nil
165165
}
166166

167-
func (c *workflowSizeChecker) failWorkflowSizeExceedsLimit() (bool, error) {
168-
historyCount := int(c.mutableState.GetNextEventID()) - 1
169-
historySize := int(c.executionStats.HistorySize)
170-
171-
if historySize > c.historySizeLimitError || historyCount > c.historyCountLimitError {
172-
executionInfo := c.mutableState.GetExecutionInfo()
173-
c.logger.Error("history size exceeds error limit.",
174-
tag.WorkflowNamespaceID(executionInfo.NamespaceId),
175-
tag.WorkflowID(executionInfo.WorkflowId),
176-
tag.WorkflowRunID(executionInfo.ExecutionState.RunId),
177-
tag.WorkflowHistorySize(historySize),
178-
tag.WorkflowEventCount(historyCount))
179-
180-
attributes := &commandpb.FailWorkflowExecutionCommandAttributes{
181-
Failure: failure.NewServerFailure(common.FailureReasonSizeExceedsLimit, true),
182-
}
183-
184-
if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE, attributes); err != nil {
185-
return false, err
186-
}
187-
return true, nil
188-
}
189-
190-
if historySize > c.historySizeLimitWarn || historyCount > c.historyCountLimitWarn {
191-
executionInfo := c.mutableState.GetExecutionInfo()
192-
c.logger.Warn("history size exceeds warn limit.",
193-
tag.WorkflowNamespaceID(executionInfo.NamespaceId),
194-
tag.WorkflowID(executionInfo.WorkflowId),
195-
tag.WorkflowRunID(executionInfo.ExecutionState.RunId),
196-
tag.WorkflowHistorySize(historySize),
197-
tag.WorkflowEventCount(historyCount))
198-
return false, nil
199-
}
200-
201-
return false, nil
202-
}
203-
204167
func (v *commandAttrValidator) validateActivityScheduleAttributes(
205168
namespaceID string,
206169
targetNamespaceID string,

service/history/historyEngine.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ var (
196196
ErrConsistentQueryBufferExceeded = serviceerror.NewInternal("consistent query buffer is full, cannot accept new consistent queries")
197197
// ErrEmptyHistoryRawEventBatch indicate that one single batch of history raw events is of size 0
198198
ErrEmptyHistoryRawEventBatch = serviceerror.NewInvalidArgument("encounter empty history batch")
199+
// ErrSizeExceedsLimit is error indicating workflow execution has exceeded system defined limit
200+
ErrSizeExceedsLimit = serviceerror.NewInvalidArgument(common.FailureReasonSizeExceedsLimit)
199201

200202
// FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy
201203
// for start workflow execution API

service/history/workflowExecutionContext.go

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ type (
138138
logger log.Logger
139139
metricsClient metrics.Client
140140
timeSource clock.TimeSource
141+
config *Config
141142

142143
mutex locks.Mutex
143144
mutableState mutableState
@@ -168,6 +169,7 @@ func newWorkflowExecutionContext(
168169
logger: logger,
169170
metricsClient: shard.GetMetricsClient(),
170171
timeSource: shard.GetTimeSource(),
172+
config: shard.GetConfig(),
171173
mutex: locks.NewMutex(),
172174
stats: &persistenceblobs.ExecutionStats{
173175
HistorySize: 0,
@@ -572,14 +574,30 @@ func (c *workflowExecutionContextImpl) updateWorkflowExecutionAsActive(
572574
now time.Time,
573575
) error {
574576

575-
return c.updateWorkflowExecutionWithNew(
577+
// We only perform this check on active cluster for the namespace
578+
forceTerminate, err := c.enforceSizeCheck()
579+
if err != nil {
580+
return err
581+
}
582+
583+
if err := c.updateWorkflowExecutionWithNew(
576584
now,
577585
persistence.UpdateWorkflowModeUpdateCurrent,
578586
nil,
579587
nil,
580588
transactionPolicyActive,
581589
nil,
582-
)
590+
); err != nil {
591+
return err
592+
}
593+
594+
if forceTerminate {
595+
// Returning an error which should be not be retried in most of the cases
596+
// Caller can explicitly check for this error if they need to special case on force termination of workflow
597+
return ErrSizeExceedsLimit
598+
}
599+
600+
return nil
583601
}
584602

585603
func (c *workflowExecutionContextImpl) updateWorkflowExecutionWithNewAsActive(
@@ -1146,3 +1164,62 @@ func (c *workflowExecutionContextImpl) reapplyEvents(
11461164

11471165
return err
11481166
}
1167+
1168+
// Returns true if execution is forced terminated
1169+
func (c *workflowExecutionContextImpl) enforceSizeCheck() (bool, error) {
1170+
historySizeLimitWarn := c.config.HistorySizeLimitWarn(c.getNamespace())
1171+
historySizeLimitError := c.config.HistorySizeLimitError(c.getNamespace())
1172+
historyCountLimitWarn := c.config.HistoryCountLimitWarn(c.getNamespace())
1173+
historyCountLimitError := c.config.HistoryCountLimitError(c.getNamespace())
1174+
1175+
historySize := int(c.stats.HistorySize)
1176+
historyCount := int(c.mutableState.GetNextEventID() - 1)
1177+
1178+
// Hard terminate workflow if still running and breached size or count limit
1179+
if (historySize > historySizeLimitError || historyCount > historyCountLimitError) &&
1180+
c.mutableState.IsWorkflowExecutionRunning() {
1181+
c.logger.Error("history size exceeds error limit.",
1182+
tag.WorkflowNamespaceID(c.namespaceID),
1183+
tag.WorkflowID(c.workflowExecution.GetWorkflowId()),
1184+
tag.WorkflowRunID(c.workflowExecution.GetRunId()),
1185+
tag.WorkflowHistorySize(historySize),
1186+
tag.WorkflowEventCount(historyCount))
1187+
1188+
// Discard pending changes in mutableState so we can apply terminate state transition
1189+
c.clear()
1190+
1191+
// Reload muutable state
1192+
mutableState, err := c.loadWorkflowExecution()
1193+
if err != nil {
1194+
return false, err
1195+
}
1196+
1197+
// Terminate workflow is written as a separate batch and might result in more than one event as we close the
1198+
// outstanding workflow task before terminating the workflow
1199+
eventBatchFirstEventID := mutableState.GetNextEventID()
1200+
if err := terminateWorkflow(
1201+
mutableState,
1202+
eventBatchFirstEventID,
1203+
common.FailureReasonSizeExceedsLimit,
1204+
nil,
1205+
identityHistoryService,
1206+
); err != nil {
1207+
return false, err
1208+
}
1209+
1210+
// Return true to caller to indicate workflow is forced terminated
1211+
return true, nil
1212+
}
1213+
1214+
if historySize > historySizeLimitWarn || historyCount > historyCountLimitWarn {
1215+
executionInfo := c.mutableState.GetExecutionInfo()
1216+
c.logger.Warn("history size exceeds warn limit.",
1217+
tag.WorkflowNamespaceID(executionInfo.NamespaceId),
1218+
tag.WorkflowID(executionInfo.WorkflowId),
1219+
tag.WorkflowRunID(executionInfo.ExecutionState.RunId),
1220+
tag.WorkflowHistorySize(historySize),
1221+
tag.WorkflowEventCount(historyCount))
1222+
}
1223+
1224+
return false, nil
1225+
}

service/history/workflowTaskHandler.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,9 @@ func (handler *workflowTaskHandlerImpl) handleCommands(
124124
commands []*commandpb.Command,
125125
) error {
126126

127-
// overall workflow size / count check
128-
failWorkflow, err := handler.sizeLimitChecker.failWorkflowSizeExceedsLimit()
129-
if err != nil || failWorkflow {
130-
return err
131-
}
132-
133127
for _, command := range commands {
134128

135-
err = handler.handleCommand(command)
129+
err := handler.handleCommand(command)
136130
if err != nil || handler.stopProcessing {
137131
return err
138132
}

0 commit comments

Comments
 (0)