Skip to content

Commit 0d74a73

Browse files
Event buffer size limit
1 parent 392e769 commit 0d74a73

File tree

6 files changed

+80
-36
lines changed

6 files changed

+80
-36
lines changed

common/dynamicconfig/constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,9 @@ const (
575575
ReplicatorProcessorEnablePriorityTaskProcessor = "history.replicatorProcessorEnablePriorityTaskProcessor"
576576
// MaximumBufferedEventsBatch is max number of buffer event in mutable state
577577
MaximumBufferedEventsBatch = "history.maximumBufferedEventsBatch"
578+
// MaximumBufferedEventsSizeInBytes is the maximum total size of all buffered events for a workflow execution.
579+
// The total size is determined by the sum of the size, in bytes, of each HistoryEvent proto.
580+
MaximumBufferedEventsSizeInBytes = "history.maximumBufferedEventsSizeInBytes"
578581
// MaximumSignalsPerExecution is max number of signals supported by single execution
579582
MaximumSignalsPerExecution = "history.maximumSignalsPerExecution"
580583
// ShardUpdateMinInterval is the minimal time interval which the shard info can be updated

service/history/configs/config.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,9 @@ type Config struct {
146146
ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn
147147

148148
// System Limits
149-
MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn
150-
MaximumSignalsPerExecution dynamicconfig.IntPropertyFnWithNamespaceFilter
149+
MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn
150+
MaximumBufferedEventsSizeInBytes dynamicconfig.IntPropertyFn
151+
MaximumSignalsPerExecution dynamicconfig.IntPropertyFnWithNamespaceFilter
151152

152153
// ShardUpdateMinInterval the minimal time interval which the shard info can be updated
153154
ShardUpdateMinInterval dynamicconfig.DurationPropertyFn
@@ -408,11 +409,12 @@ func NewConfig(
408409
ReplicationProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerQueueSize, 128),
409410
ReplicationProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerWorkerCount, 512),
410411

411-
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
412-
MaximumSignalsPerExecution: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MaximumSignalsPerExecution, 0),
413-
ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute),
414-
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),
415-
ShardSyncTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15),
412+
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
413+
MaximumBufferedEventsSizeInBytes: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsSizeInBytes, 1<<20),
414+
MaximumSignalsPerExecution: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MaximumSignalsPerExecution, 0),
415+
ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute),
416+
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),
417+
ShardSyncTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15),
416418

417419
// history client: client/history/client.go set the client timeout 30s
418420
// TODO: Return this value to the client: go.temporal.io/server/issues/294

service/history/workflow/history_builder.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1119,10 +1119,18 @@ func (b *HistoryBuilder) HasAnyBufferedEvent(filter BufferedEventFilter) bool {
11191119
return false
11201120
}
11211121

1122-
func (b *HistoryBuilder) BufferEventSize() int {
1122+
func (b *HistoryBuilder) NumBufferedEvents() int {
11231123
return len(b.dbBufferBatch) + len(b.memBufferBatch)
11241124
}
11251125

1126+
func (b *HistoryBuilder) SizeInBytesOfBufferedEvents() int {
1127+
size := 0
1128+
for _, ev := range append(b.dbBufferBatch, b.memBufferBatch...) {
1129+
size += ev.Size()
1130+
}
1131+
return size
1132+
}
1133+
11261134
func (b *HistoryBuilder) NextEventID() int64 {
11271135
return b.nextEventID
11281136
}

service/history/workflow/mutable_state_impl.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4601,6 +4601,17 @@ func (ms *MutableStateImpl) closeTransactionWithPolicyCheck(
46014601
}
46024602
}
46034603

4604+
func (ms *MutableStateImpl) BufferSizeAcceptable() bool {
4605+
if ms.hBuilder.NumBufferedEvents() >= ms.config.MaximumBufferedEventsBatch() {
4606+
return false
4607+
}
4608+
4609+
if ms.hBuilder.SizeInBytesOfBufferedEvents() >= ms.config.MaximumBufferedEventsSizeInBytes() {
4610+
return false
4611+
}
4612+
return true
4613+
}
4614+
46044615
func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit(
46054616
transactionPolicy TransactionPolicy,
46064617
) error {
@@ -4610,7 +4621,7 @@ func (ms *MutableStateImpl) closeTransactionHandleBufferedEventsLimit(
46104621
return nil
46114622
}
46124623

4613-
if ms.hBuilder.BufferEventSize() < ms.config.MaximumBufferedEventsBatch() {
4624+
if ms.BufferSizeAcceptable() {
46144625
return nil
46154626
}
46164627

service/history/workflow/mutable_state_impl_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica
152152
}))
153153
err := s.mutableState.ReplicateWorkflowTaskCompletedEvent(newWorkflowTaskCompletedEvent)
154154
s.NoError(err)
155-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
155+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
156156
}
157157

158158
func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplicated_FailoverWorkflowTaskTimeout() {
@@ -175,7 +175,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica
175175
newWorkflowTask,
176176
)
177177
s.NoError(err)
178-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
178+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
179179
}
180180

181181
func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplicated_FailoverWorkflowTaskFailed() {
@@ -202,7 +202,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica
202202
"", "", "", 0,
203203
)
204204
s.NoError(err)
205-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
205+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
206206
}
207207

208208
func (s *mutableStateSuite) TestChecksum() {
@@ -421,7 +421,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskSchedule_CurrentVersionChan
421421
s.NotNil(wt)
422422

423423
s.Equal(int32(1), s.mutableState.GetExecutionInfo().WorkflowTaskAttempt)
424-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
424+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
425425
}
426426

427427
func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged() {
@@ -461,7 +461,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged
461461
"random identity",
462462
)
463463
s.NoError(err)
464-
s.Equal(0, s.mutableState.hBuilder.BufferEventSize())
464+
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())
465465
}
466466

467467
func (s *mutableStateSuite) TestSanitizedMutableState() {

service/history/workflow/workflow_test/mutable_state_impl_test.go

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ package workflow_test
2929

3030
import (
3131
"fmt"
32+
"math"
3233
"testing"
3334
"time"
3435

@@ -59,24 +60,35 @@ func TestMutableStateImpl_ForceFlushBufferedEvents(t *testing.T) {
5960

6061
for _, tc := range []mutationTestCase{
6162
{
62-
name: "signals<maxNumEvents",
63-
signals: 1,
64-
maxNumEvents: 2,
63+
name: "Number of events acceptable",
6564
transactionPolicy: workflow.TransactionPolicyActive,
65+
signals: 1,
66+
maxEvents: 2,
67+
maxSizeInBytes: math.MaxInt,
6668
expectFlush: false,
6769
},
6870
{
69-
name: "signals=maxNumEvents",
71+
name: "Number of events has reached limit",
72+
transactionPolicy: workflow.TransactionPolicyActive,
7073
signals: 2,
71-
maxNumEvents: 2,
74+
maxEvents: 2,
75+
maxSizeInBytes: math.MaxInt,
76+
expectFlush: true,
77+
},
78+
{
79+
name: "Number of events acceptable but byte size limit exceeded",
7280
transactionPolicy: workflow.TransactionPolicyActive,
81+
signals: 1,
82+
maxEvents: 2,
83+
maxSizeInBytes: 0,
7384
expectFlush: true,
7485
},
7586
{
76-
name: "signals>maxNumEvents",
77-
signals: 3,
78-
maxNumEvents: 2,
87+
name: "Number of events limit reached and byte size limit exceeded",
7988
transactionPolicy: workflow.TransactionPolicyActive,
89+
signals: 2,
90+
maxEvents: 2,
91+
maxSizeInBytes: 0,
8092
expectFlush: true,
8193
},
8294
} {
@@ -88,11 +100,12 @@ type mutationTestCase struct {
88100
name string
89101
transactionPolicy workflow.TransactionPolicy
90102
signals int
91-
maxNumEvents int
103+
maxEvents int
92104
expectFlush bool
105+
maxSizeInBytes int
93106
}
94107

95-
func (c mutationTestCase) Run(t *testing.T) {
108+
func (c *mutationTestCase) Run(t *testing.T) {
96109
t.Parallel()
97110

98111
nsEntry := tests.LocalNamespaceEntry
@@ -118,7 +131,7 @@ func (c mutationTestCase) Run(t *testing.T) {
118131
}
119132
}
120133

121-
func (c mutationTestCase) startWFT(
134+
func (c *mutationTestCase) startWFT(
122135
t *testing.T,
123136
ms *workflow.MutableStateImpl,
124137
) *workflow.WorkflowTaskInfo {
@@ -137,7 +150,7 @@ func (c mutationTestCase) startWFT(
137150
return wft
138151
}
139152

140-
func (c mutationTestCase) startWorkflowExecution(
153+
func (c *mutationTestCase) startWorkflowExecution(
141154
t *testing.T,
142155
ms *workflow.MutableStateImpl,
143156
nsEntry *namespace.Namespace,
@@ -165,7 +178,7 @@ func (c mutationTestCase) startWorkflowExecution(
165178
}
166179
}
167180

168-
func (c mutationTestCase) addWorkflowExecutionSignaled(t *testing.T, i int, ms *workflow.MutableStateImpl) {
181+
func (c *mutationTestCase) addWorkflowExecutionSignaled(t *testing.T, i int, ms *workflow.MutableStateImpl) {
169182
t.Helper()
170183

171184
payload := &commonpb.Payloads{}
@@ -184,7 +197,7 @@ func (c mutationTestCase) addWorkflowExecutionSignaled(t *testing.T, i int, ms *
184197
}
185198
}
186199

187-
func (c mutationTestCase) createMutableState(t *testing.T, nsEntry *namespace.Namespace) *workflow.MutableStateImpl {
200+
func (c *mutationTestCase) createMutableState(t *testing.T, nsEntry *namespace.Namespace) *workflow.MutableStateImpl {
188201
t.Helper()
189202

190203
ctrl := gomock.NewController(t)
@@ -223,16 +236,23 @@ func (c mutationTestCase) createMutableState(t *testing.T, nsEntry *namespace.Na
223236
return ms
224237
}
225238

226-
func (c mutationTestCase) createConfig() *configs.Config {
239+
func (c *mutationTestCase) createConfig() *configs.Config {
227240
cfg := tests.NewDynamicConfig()
228-
cfg.MaximumBufferedEventsBatch = func() int {
229-
return c.maxNumEvents
230-
}
241+
cfg.MaximumBufferedEventsBatch = c.getMaxEvents
242+
cfg.MaximumBufferedEventsSizeInBytes = c.getMaxSizeInBytes
231243

232244
return cfg
233245
}
234246

235-
func (c mutationTestCase) testWFTFailedEvent(
247+
func (c *mutationTestCase) getMaxEvents() int {
248+
return c.maxEvents
249+
}
250+
251+
func (c *mutationTestCase) getMaxSizeInBytes() int {
252+
return c.maxSizeInBytes
253+
}
254+
255+
func (c *mutationTestCase) testWFTFailedEvent(
236256
t *testing.T,
237257
wft *workflow.WorkflowTaskInfo,
238258
event *history.HistoryEvent,
@@ -256,7 +276,7 @@ func (c mutationTestCase) testWFTFailedEvent(
256276
}
257277
}
258278

259-
func (c mutationTestCase) findWFTEvent(eventType enumspb.EventType, workflowEvents []*persistence.WorkflowEvents) (
279+
func (c *mutationTestCase) findWFTEvent(eventType enumspb.EventType, workflowEvents []*persistence.WorkflowEvents) (
260280
*history.HistoryEvent,
261281
bool,
262282
) {
@@ -271,7 +291,7 @@ func (c mutationTestCase) findWFTEvent(eventType enumspb.EventType, workflowEven
271291
return nil, false
272292
}
273293

274-
func (c mutationTestCase) testFailure(
294+
func (c *mutationTestCase) testFailure(
275295
t *testing.T,
276296
ms *workflow.MutableStateImpl,
277297
wft *workflow.WorkflowTaskInfo,
@@ -310,7 +330,7 @@ func (c mutationTestCase) testFailure(
310330
c.testWFTFailedEvent(t, wft, event)
311331
}
312332

313-
func (c mutationTestCase) testSuccess(
333+
func (c *mutationTestCase) testSuccess(
314334
t *testing.T,
315335
ms *workflow.MutableStateImpl,
316336
workflowEvents []*persistence.WorkflowEvents,

0 commit comments

Comments
 (0)