Skip to content

Commit 4e0c7ed

Browse files
MichaelSnowdenrodrigozhou
authored andcommitted
Event buffer size limit (#4296)
* Event buffer size limit * Make size limit inclusive * Make default size limit more readable * Add history builder test for proto size * Split db and memory batch for loop * Use values instead of pointers to avoid parallel test glitch * Make a copy of the test case for parallel test to work
1 parent e9a7bdc commit 4e0c7ed

File tree

7 files changed

+78
-17
lines changed

7 files changed

+78
-17
lines changed

common/dynamicconfig/constants.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,8 +572,11 @@ const (
572572
ReplicatorProcessorUpdateAckIntervalJitterCoefficient = "history.replicatorProcessorUpdateAckIntervalJitterCoefficient"
573573
// ReplicatorProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for ReplicatorProcessor
574574
ReplicatorProcessorEnablePriorityTaskProcessor = "history.replicatorProcessorEnablePriorityTaskProcessor"
575-
// MaximumBufferedEventsBatch is max number of buffer event in mutable state
575+
// MaximumBufferedEventsBatch is the maximum permissible number of buffered events for any given mutable state.
576576
MaximumBufferedEventsBatch = "history.maximumBufferedEventsBatch"
577+
// MaximumBufferedEventsSizeInBytes is the maximum permissible size of all buffered events for any given mutable
578+
// state. The total size is determined by the sum of the size, in bytes, of each HistoryEvent proto.
579+
MaximumBufferedEventsSizeInBytes = "history.maximumBufferedEventsSizeInBytes"
577580
// MaximumSignalsPerExecution is max number of signals supported by single execution
578581
MaximumSignalsPerExecution = "history.maximumSignalsPerExecution"
579582
// ShardUpdateMinInterval is the minimal time interval which the shard info can be updated

common/metrics/metric_defs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ const (
247247
FrontendClientListOpenWorkflowExecutionsScope = "FrontendClientListOpenWorkflowExecutions"
248248
// FrontendClientPollActivityTaskQueueScope tracks RPC calls to frontend service
249249
FrontendClientPollActivityTaskQueueScope = "FrontendClientPollActivityTaskQueue"
250-
//FrontendClientPollWorkflowExecutionUpdateScope tracks RPC calls to frontend service
250+
// FrontendClientPollWorkflowExecutionUpdateScope tracks RPC calls to frontend service
251251
FrontendClientPollWorkflowExecutionUpdateScope = "FrontendClientPollWorkflowExecutionUpdate"
252252
// FrontendClientPollWorkflowTaskQueueScope tracks RPC calls to frontend service
253253
FrontendClientPollWorkflowTaskQueueScope = "FrontendClientPollWorkflowTaskQueue"

service/history/configs/config.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,9 @@ type Config struct {
146146
ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn
147147

148148
// System Limits
149-
MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn
150-
MaximumSignalsPerExecution dynamicconfig.IntPropertyFnWithNamespaceFilter
149+
MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn
150+
MaximumBufferedEventsSizeInBytes dynamicconfig.IntPropertyFn
151+
MaximumSignalsPerExecution dynamicconfig.IntPropertyFnWithNamespaceFilter
151152

152153
// ShardUpdateMinInterval the minimal time interval which the shard info can be updated
153154
ShardUpdateMinInterval dynamicconfig.DurationPropertyFn
@@ -408,11 +409,12 @@ func NewConfig(
408409
ReplicationProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerQueueSize, 128),
409410
ReplicationProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerWorkerCount, 512),
410411

411-
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
412-
MaximumSignalsPerExecution: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MaximumSignalsPerExecution, 0),
413-
ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute),
414-
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),
415-
ShardSyncTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15),
412+
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
413+
MaximumBufferedEventsSizeInBytes: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsSizeInBytes, 2*1024*1024),
414+
MaximumSignalsPerExecution: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MaximumSignalsPerExecution, 0),
415+
ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute),
416+
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),
417+
ShardSyncTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15),
416418

417419
// history client: client/history/client.go set the client timeout 30s
418420
// TODO: Return this value to the client: go.temporal.io/server/issues/294

service/history/workflow/history_builder.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1100,10 +1100,21 @@ func (b *HistoryBuilder) HasBufferEvents() bool {
11001100
return len(b.dbBufferBatch) > 0 || len(b.memBufferBatch) > 0
11011101
}
11021102

1103-
func (b *HistoryBuilder) BufferEventSize() int {
1103+
func (b *HistoryBuilder) NumBufferedEvents() int {
11041104
return len(b.dbBufferBatch) + len(b.memBufferBatch)
11051105
}
11061106

1107+
func (b *HistoryBuilder) SizeInBytesOfBufferedEvents() int {
1108+
size := 0
1109+
for _, ev := range b.dbBufferBatch {
1110+
size += ev.Size()
1111+
}
1112+
for _, ev := range b.memBufferBatch {
1113+
size += ev.Size()
1114+
}
1115+
return size
1116+
}
1117+
11071118
func (b *HistoryBuilder) NextEventID() int64 {
11081119
return b.nextEventID
11091120
}

service/history/workflow/history_builder_test.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import (
4141
taskqueuepb "go.temporal.io/api/taskqueue/v1"
4242
workflowpb "go.temporal.io/api/workflow/v1"
4343
"go.temporal.io/api/workflowservice/v1"
44-
4544
"go.temporal.io/server/api/historyservice/v1"
4645
workflowspb "go.temporal.io/server/api/workflow/v1"
4746
"go.temporal.io/server/common"
@@ -2315,6 +2314,41 @@ func (s *historyBuilderSuite) TestReorder() {
23152314
)
23162315
}
23172316

2317+
func (s *historyBuilderSuite) TestBufferSize_Memory() {
2318+
s.Assert().Zero(s.historyBuilder.NumBufferedEvents())
2319+
s.Assert().Zero(s.historyBuilder.SizeInBytesOfBufferedEvents())
2320+
s.historyBuilder.AddWorkflowExecutionSignaledEvent(
2321+
"signal-name",
2322+
&commonpb.Payloads{},
2323+
"identity",
2324+
&commonpb.Header{},
2325+
)
2326+
s.Assert().Equal(1, s.historyBuilder.NumBufferedEvents())
2327+
// the size of the proto is non-deterministic, so just assert that it's non-zero, and it isn't really high
2328+
s.Assert().Greater(s.historyBuilder.SizeInBytesOfBufferedEvents(), 0)
2329+
s.Assert().Less(s.historyBuilder.SizeInBytesOfBufferedEvents(), 100)
2330+
s.flush()
2331+
s.Assert().Zero(s.historyBuilder.NumBufferedEvents())
2332+
s.Assert().Zero(s.historyBuilder.SizeInBytesOfBufferedEvents())
2333+
}
2334+
2335+
func (s *historyBuilderSuite) TestBufferSize_DB() {
2336+
s.Assert().Zero(s.historyBuilder.NumBufferedEvents())
2337+
s.Assert().Zero(s.historyBuilder.SizeInBytesOfBufferedEvents())
2338+
s.historyBuilder.dbBufferBatch = []*historypb.HistoryEvent{{
2339+
EventType: enumspb.EVENT_TYPE_TIMER_FIRED,
2340+
EventId: common.BufferedEventID,
2341+
TaskId: common.EmptyEventTaskID,
2342+
}}
2343+
s.Assert().Equal(1, s.historyBuilder.NumBufferedEvents())
2344+
// the size of the proto is non-deterministic, so just assert that it's non-zero, and it isn't really high
2345+
s.Assert().Greater(s.historyBuilder.SizeInBytesOfBufferedEvents(), 0)
2346+
s.Assert().Less(s.historyBuilder.SizeInBytesOfBufferedEvents(), 100)
2347+
s.flush()
2348+
s.Assert().Zero(s.historyBuilder.NumBufferedEvents())
2349+
s.Assert().Zero(s.historyBuilder.SizeInBytesOfBufferedEvents())
2350+
}
2351+
23182352
func (s *historyBuilderSuite) assertEventIDTaskID(
23192353
historyMutation *HistoryMutation,
23202354
) {

service/history/workflow/mutable_state_impl.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4502,6 +4502,17 @@ func (ms *MutableStateImpl) closeTransactionWithPolicyCheck(
45024502
}
45034503
}
45044504

4505+
func (ms *MutableStateImpl) BufferSizeAcceptable() bool {
4506+
if ms.hBuilder.NumBufferedEvents() > ms.config.MaximumBufferedEventsBatch() {
4507+
return false
4508+
}
4509+
4510+
if ms.hBuilder.SizeInBytesOfBufferedEvents() > ms.config.MaximumBufferedEventsSizeInBytes() {
4511+
return false
4512+
}
4513+
return true
4514+
}
4515+
45054516
func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit(
45064517
transactionPolicy TransactionPolicy,
45074518
) error {
@@ -4511,7 +4522,7 @@ func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit(
45114522
return nil
45124523
}
45134524

4514-
if ms.hBuilder.BufferEventSize() < ms.config.MaximumBufferedEventsBatch() {
4525+
if ms.BufferSizeAcceptable() {
45154526
return nil
45164527
}
45174528

service/history/workflow/mutable_state_impl_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica
147147
}))
148148
err := s.mutableState.ReplicateWorkflowTaskCompletedEvent(newWorkflowTaskCompletedEvent)
149149
s.NoError(err)
150-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
150+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
151151
}
152152

153153
func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplicated_FailoverWorkflowTaskTimeout() {
@@ -170,7 +170,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica
170170
newWorkflowTask,
171171
)
172172
s.NoError(err)
173-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
173+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
174174
}
175175

176176
func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplicated_FailoverWorkflowTaskFailed() {
@@ -197,7 +197,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica
197197
"", "", "", 0,
198198
)
199199
s.NoError(err)
200-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
200+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
201201
}
202202

203203
func (s *mutableStateSuite) TestChecksum() {
@@ -416,7 +416,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskSchedule_CurrentVersionChan
416416
s.NotNil(wt)
417417

418418
s.Equal(int32(1), s.mutableState.GetExecutionInfo().WorkflowTaskAttempt)
419-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
419+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
420420
}
421421

422422
func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged() {
@@ -456,7 +456,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged
456456
"random identity",
457457
)
458458
s.NoError(err)
459-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
459+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
460460
}
461461

462462
func (s *mutableStateSuite) TestSanitizedMutableState() {

0 commit comments

Comments
 (0)