Skip to content

Commit ba54ef7

Browse files
yycpttrodrigozhou
authored andcommitted
Truncate activity failure in mutable state (#4338)
1 parent 8531e92 commit ba54ef7

File tree

5 files changed

+150
-46
lines changed

5 files changed

+150
-46
lines changed

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ const (
151151
HistoryCountLimitError = "limit.historyCount.error"
152152
// HistoryCountLimitWarn is the per workflow execution history event count limit for warning
153153
HistoryCountLimitWarn = "limit.historyCount.warn"
154+
// MutableStateActivityFailureSizeLimitError is the per activity failure size limit for workflow mutable state.
155+
// If exceeded, failure will be truncated before being stored in mutable state.
156+
MutableStateActivityFailureSizeLimitError = "limit.mutableStateActivityFailureSize.error"
157+
// MutableStateActivityFailureSizeLimitWarn is the per activity failure size warning limit for workflow mutable state
158+
MutableStateActivityFailureSizeLimitWarn = "limit.mutableStateActivityFailureSize.warn"
154159
// HistoryCountSuggestContinueAsNew is the workflow execution history event count limit to
155160
// suggest continue-as-new (in workflow task started event)
156161
HistoryCountSuggestContinueAsNew = "limit.historyCount.suggestContinueAsNew"

common/log/tag/tags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,11 @@ func WorkflowTaskQueueName(taskQueueName string) ZapTag {
307307

308308
// size limit
309309

310+
// BlobSize returns tag for BlobSize
311+
func BlobSize(blobSize int64) ZapTag {
312+
return NewInt64("blob-size", blobSize)
313+
}
314+
310315
// WorkflowSize returns tag for WorkflowSize
311316
func WorkflowSize(workflowSize int64) ZapTag {
312317
return NewInt64("wf-size", workflowSize)

service/history/configs/config.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -179,20 +179,22 @@ type Config struct {
179179
DurableArchivalEnabled dynamicconfig.BoolPropertyFn
180180

181181
// Size limit related settings
182-
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
183-
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
184-
MemoSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
185-
MemoSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
186-
HistorySizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
187-
HistorySizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
188-
HistorySizeSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
189-
HistoryCountLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
190-
HistoryCountLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
191-
HistoryCountSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
192-
NumPendingChildExecutionsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
193-
NumPendingActivitiesLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
194-
NumPendingSignalsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
195-
NumPendingCancelsRequestLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
182+
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
183+
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
184+
MemoSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
185+
MemoSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
186+
HistorySizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
187+
HistorySizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
188+
HistorySizeSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
189+
HistoryCountLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
190+
HistoryCountLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
191+
HistoryCountSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
192+
MutableStateActivityFailureSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
193+
MutableStateActivityFailureSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
194+
NumPendingChildExecutionsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
195+
NumPendingActivitiesLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
196+
NumPendingSignalsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
197+
NumPendingCancelsRequestLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
196198

197199
// DefaultActivityRetryOptions specifies the out-of-box retry policy if
198200
// none is configured on the Activity by the user.
@@ -430,20 +432,22 @@ func NewConfig(
430432
ArchiveSignalTimeout: dc.GetDurationProperty(dynamicconfig.ArchiveSignalTimeout, 300*time.Millisecond),
431433
DurableArchivalEnabled: dc.GetBoolProperty(dynamicconfig.DurableArchivalEnabled, true),
432434

433-
BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
434-
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 512*1024),
435-
MemoSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitError, 2*1024*1024),
436-
MemoSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitWarn, 2*1024),
437-
NumPendingChildExecutionsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingChildExecutionsLimitError, 50000),
438-
NumPendingActivitiesLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingActivitiesLimitError, 50000),
439-
NumPendingSignalsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingSignalsLimitError, 50000),
440-
NumPendingCancelsRequestLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingCancelRequestsLimitError, 50000),
441-
HistorySizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitError, 50*1024*1024),
442-
HistorySizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitWarn, 10*1024*1024),
443-
HistorySizeSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeSuggestContinueAsNew, 4*1024*1024),
444-
HistoryCountLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitError, 50*1024),
445-
HistoryCountLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitWarn, 10*1024),
446-
HistoryCountSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountSuggestContinueAsNew, 4*1024),
435+
BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
436+
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 512*1024),
437+
MemoSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitError, 2*1024*1024),
438+
MemoSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitWarn, 2*1024),
439+
NumPendingChildExecutionsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingChildExecutionsLimitError, 50000),
440+
NumPendingActivitiesLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingActivitiesLimitError, 50000),
441+
NumPendingSignalsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingSignalsLimitError, 50000),
442+
NumPendingCancelsRequestLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingCancelRequestsLimitError, 50000),
443+
HistorySizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitError, 50*1024*1024),
444+
HistorySizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitWarn, 10*1024*1024),
445+
HistorySizeSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeSuggestContinueAsNew, 4*1024*1024),
446+
HistoryCountLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitError, 50*1024),
447+
HistoryCountLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitWarn, 10*1024),
448+
HistoryCountSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountSuggestContinueAsNew, 4*1024),
449+
MutableStateActivityFailureSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MutableStateActivityFailureSizeLimitError, 4*1024),
450+
MutableStateActivityFailureSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MutableStateActivityFailureSizeLimitWarn, 2*1024),
447451

448452
ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.HistoryThrottledLogRPS, 4),
449453
EnableStickyQuery: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableStickyQuery, true),

service/history/workflow/mutable_state_impl.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import (
5555
"go.temporal.io/server/common/convert"
5656
"go.temporal.io/server/common/definition"
5757
"go.temporal.io/server/common/enums"
58+
"go.temporal.io/server/common/failure"
5859
"go.temporal.io/server/common/log"
5960
"go.temporal.io/server/common/log/tag"
6061
"go.temporal.io/server/common/metrics"
@@ -3846,7 +3847,7 @@ func (ms *MutableStateImpl) RetryActivity(
38463847
ai.StartedTime = timestamp.TimePtr(time.Time{})
38473848
ai.TimerTaskStatus = TimerTaskStatusNone
38483849
ai.RetryLastWorkerIdentity = ai.StartedIdentity
3849-
ai.RetryLastFailure = failure
3850+
ai.RetryLastFailure = ms.truncateRetryableActivityFailure(failure)
38503851

38513852
if err := ms.taskGenerator.GenerateActivityRetryTasks(
38523853
ai.ScheduledEventId,
@@ -3859,6 +3860,41 @@ func (ms *MutableStateImpl) RetryActivity(
38593860
return enumspb.RETRY_STATE_IN_PROGRESS, nil
38603861
}
38613862

3863+
func (ms *MutableStateImpl) truncateRetryableActivityFailure(
3864+
activityFailure *failurepb.Failure,
3865+
) *failurepb.Failure {
3866+
namespaceName := ms.namespaceEntry.Name().String()
3867+
failureSize := activityFailure.Size()
3868+
3869+
if failureSize <= ms.config.MutableStateActivityFailureSizeLimitWarn(namespaceName) {
3870+
return activityFailure
3871+
}
3872+
3873+
throttledLogger := log.With(
3874+
ms.shard.GetThrottledLogger(),
3875+
tag.WorkflowNamespace(namespaceName),
3876+
tag.WorkflowID(ms.executionInfo.WorkflowId),
3877+
tag.WorkflowRunID(ms.executionState.RunId),
3878+
tag.BlobSize(int64(failureSize)),
3879+
tag.BlobSizeViolationOperation("RetryActivity"),
3880+
)
3881+
3882+
sizeLimitError := ms.config.MutableStateActivityFailureSizeLimitError(namespaceName)
3883+
if failureSize <= sizeLimitError {
3884+
throttledLogger.Warn("Activity failure size exceeds warning limit for mutable state.")
3885+
return activityFailure
3886+
}
3887+
3888+
throttledLogger.Warn("Activity failure size exceeds error limit for mutable state, truncated.")
3889+
3890+
// nonRetryable is set to false here as only retryable failures are recorded in mutable state.
3891+
// also when this method is called, the check for isRetryable is already done, so the value
3892+
// is only for visibility/debugging purpose.
3893+
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, false)
3894+
serverFailure.Cause = failure.Truncate(activityFailure, sizeLimitError)
3895+
return serverFailure
3896+
}
3897+
38623898
// TODO mutable state should generate corresponding transfer / timer tasks according to
38633899
// updates accumulated, while currently all transfer / timer tasks are managed manually
38643900

service/history/workflow/mutable_state_impl_test.go

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
commandpb "go.temporal.io/api/command/v1"
3737
commonpb "go.temporal.io/api/common/v1"
3838
enumspb "go.temporal.io/api/enums/v1"
39+
failurepb "go.temporal.io/api/failure/v1"
3940
historypb "go.temporal.io/api/history/v1"
4041
taskqueuepb "go.temporal.io/api/taskqueue/v1"
4142

@@ -105,6 +106,8 @@ func (s *mutableStateSuite) SetupTest() {
105106
// set the checksum probabilities to 100% for exercising during test
106107
s.mockConfig.MutableStateChecksumGenProbability = func(namespace string) int { return 100 }
107108
s.mockConfig.MutableStateChecksumVerifyProbability = func(namespace string) int { return 100 }
109+
s.mockConfig.MutableStateActivityFailureSizeLimitWarn = func(namespace string) int { return 1 * 1024 }
110+
s.mockConfig.MutableStateActivityFailureSizeLimitError = func(namespace string) int { return 2 * 1024 }
108111
s.mockShard.SetEventsCacheForTesting(s.mockEventsCache)
109112

110113
s.testScope = s.mockShard.Resource.MetricsScope.(tally.TestScope)
@@ -839,49 +842,41 @@ func (s *mutableStateSuite) TestReplicateActivityTaskStartedEvent() {
839842
}
840843

841844
func (s *mutableStateSuite) TestTotalEntitiesCount() {
842-
namespaceEntry := s.newNamespaceCacheEntry()
843-
mutableState := TestLocalMutableState(
844-
s.mockShard,
845-
s.mockEventsCache,
846-
namespaceEntry,
847-
s.logger,
848-
uuid.New(),
849-
)
850845
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
851846

852847
// scheduling, starting & completing workflow task is omitted here
853848

854849
workflowTaskCompletedEventID := int64(4)
855-
_, _, err := mutableState.AddActivityTaskScheduledEvent(
850+
_, _, err := s.mutableState.AddActivityTaskScheduledEvent(
856851
workflowTaskCompletedEventID,
857852
&commandpb.ScheduleActivityTaskCommandAttributes{},
858853
false,
859854
)
860855
s.NoError(err)
861856

862-
_, _, err = mutableState.AddStartChildWorkflowExecutionInitiatedEvent(
857+
_, _, err = s.mutableState.AddStartChildWorkflowExecutionInitiatedEvent(
863858
workflowTaskCompletedEventID,
864859
uuid.New(),
865860
&commandpb.StartChildWorkflowExecutionCommandAttributes{},
866861
namespace.ID(uuid.New()),
867862
)
868863
s.NoError(err)
869864

870-
_, _, err = mutableState.AddTimerStartedEvent(
865+
_, _, err = s.mutableState.AddTimerStartedEvent(
871866
workflowTaskCompletedEventID,
872867
&commandpb.StartTimerCommandAttributes{},
873868
)
874869
s.NoError(err)
875870

876-
_, _, err = mutableState.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(
871+
_, _, err = s.mutableState.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(
877872
workflowTaskCompletedEventID,
878873
uuid.New(),
879874
&commandpb.RequestCancelExternalWorkflowExecutionCommandAttributes{},
880875
namespace.ID(uuid.New()),
881876
)
882877
s.NoError(err)
883878

884-
_, _, err = mutableState.AddSignalExternalWorkflowExecutionInitiatedEvent(
879+
_, _, err = s.mutableState.AddSignalExternalWorkflowExecutionInitiatedEvent(
885880
workflowTaskCompletedEventID,
886881
uuid.New(),
887882
&commandpb.SignalExternalWorkflowExecutionCommandAttributes{
@@ -894,7 +889,7 @@ func (s *mutableStateSuite) TestTotalEntitiesCount() {
894889
)
895890
s.NoError(err)
896891

897-
_, err = mutableState.AddWorkflowExecutionSignaled(
892+
_, err = s.mutableState.AddWorkflowExecutionSignaled(
898893
"signalName",
899894
&commonpb.Payloads{},
900895
"identity",
@@ -903,12 +898,12 @@ func (s *mutableStateSuite) TestTotalEntitiesCount() {
903898
s.NoError(err)
904899

905900
s.mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(
906-
namespaceEntry.IsGlobalNamespace(),
907-
mutableState.GetCurrentVersion(),
901+
tests.LocalNamespaceEntry.IsGlobalNamespace(),
902+
s.mutableState.GetCurrentVersion(),
908903
).Return(cluster.TestCurrentClusterName)
909904
s.mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName)
910905

911-
mutation, _, err := mutableState.CloseTransactionAsMutation(
906+
mutation, _, err := s.mutableState.CloseTransactionAsMutation(
912907
TransactionPolicyActive,
913908
)
914909
s.NoError(err)
@@ -982,3 +977,62 @@ func (s *mutableStateSuite) TestSpeculativeWorkflowTaskNotPersisted() {
982977
})
983978
}
984979
}
980+
981+
func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() {
982+
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
983+
984+
// scheduling, starting & completing workflow task is omitted here
985+
986+
workflowTaskCompletedEventID := int64(4)
987+
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
988+
workflowTaskCompletedEventID,
989+
&commandpb.ScheduleActivityTaskCommandAttributes{
990+
ActivityId: "5",
991+
ActivityType: &commonpb.ActivityType{Name: "activity-type"},
992+
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
993+
RetryPolicy: &commonpb.RetryPolicy{
994+
InitialInterval: timestamp.DurationFromSeconds(1),
995+
},
996+
},
997+
false,
998+
)
999+
s.NoError(err)
1000+
1001+
_, err = s.mutableState.AddActivityTaskStartedEvent(
1002+
activityInfo,
1003+
activityInfo.ScheduledEventId,
1004+
uuid.New(),
1005+
"worker-identity",
1006+
)
1007+
s.NoError(err)
1008+
1009+
failureSizeErrorLimit := s.mockConfig.MutableStateActivityFailureSizeLimitError(
1010+
s.mutableState.namespaceEntry.Name().String(),
1011+
)
1012+
1013+
activityFailure := &failurepb.Failure{
1014+
Message: "activity failure with large details",
1015+
Source: "application",
1016+
FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
1017+
Type: "application-failure-type",
1018+
NonRetryable: false,
1019+
Details: &commonpb.Payloads{
1020+
Payloads: []*commonpb.Payload{
1021+
{
1022+
Data: make([]byte, failureSizeErrorLimit*2),
1023+
},
1024+
},
1025+
},
1026+
}},
1027+
}
1028+
s.Greater(activityFailure.Size(), failureSizeErrorLimit)
1029+
1030+
retryState, err := s.mutableState.RetryActivity(activityInfo, activityFailure)
1031+
s.NoError(err)
1032+
s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState)
1033+
1034+
activityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
1035+
s.True(ok)
1036+
s.LessOrEqual(activityInfo.RetryLastFailure.Size(), failureSizeErrorLimit)
1037+
s.Equal(activityFailure.GetMessage(), activityInfo.RetryLastFailure.Cause.GetMessage())
1038+
}

0 commit comments

Comments
 (0)