Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2387,6 +2387,11 @@ the number of children greater than or equal to this threshold`,
30,
`ReplicationTaskProcessorShardQPS is the qps of task processing rate limiter on shard level`,
)
ReplicationTaskProcessorApplyPersistenceQPS = NewGlobalFloatSetting(
"history.ReplicationTaskProcessorApplyPersistenceQPS",
10000,
`ReplicationTaskProcessorApplyPersistenceQPS is the qps of task processing rate limiter on persistence level`,
)
ReplicationEnableDLQMetrics = NewGlobalBoolSetting(
"history.ReplicationEnableDLQMetrics",
true,
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ type Config struct {
ReplicationTaskProcessorCleanupJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter
ReplicationTaskProcessorHostQPS dynamicconfig.FloatPropertyFn
ReplicationTaskProcessorShardQPS dynamicconfig.FloatPropertyFn
ReplicationTaskProcessorApplyPersistenceQPS dynamicconfig.FloatPropertyFn
ReplicationEnableDLQMetrics dynamicconfig.BoolPropertyFn
ReplicationEnableUpdateWithNewTaskMerge dynamicconfig.BoolPropertyFn
ReplicationMultipleBatches dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -556,6 +557,7 @@ func NewConfig(
ReplicatorProcessorMaxSkipTaskCount: dynamicconfig.ReplicatorMaxSkipTaskCount.Get(dc),
ReplicationTaskProcessorHostQPS: dynamicconfig.ReplicationTaskProcessorHostQPS.Get(dc),
ReplicationTaskProcessorShardQPS: dynamicconfig.ReplicationTaskProcessorShardQPS.Get(dc),
ReplicationTaskProcessorApplyPersistenceQPS: dynamicconfig.ReplicationTaskProcessorApplyPersistenceQPS.Get(dc),
ReplicationEnableDLQMetrics: dynamicconfig.ReplicationEnableDLQMetrics.Get(dc),
ReplicationEnableUpdateWithNewTaskMerge: dynamicconfig.ReplicationEnableUpdateWithNewTaskMerge.Get(dc),
ReplicationStreamSyncStatusDuration: dynamicconfig.ReplicationStreamSyncStatusDuration.Get(dc),
Expand Down
12 changes: 11 additions & 1 deletion service/history/ndc/history_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.temporal.io/server/common/persistence/transitionhistory"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/service/history/consts"
historyi "go.temporal.io/server/service/history/interfaces"
Expand Down Expand Up @@ -104,6 +105,7 @@ type (
workflowCache wcache.Cache
eventsReapplier EventsReapplier
transactionMgr TransactionManager
rateLimiter quotas.RateLimiter
logger log.Logger

mutableStateMapper *MutableStateMapperImpl
Expand Down Expand Up @@ -137,7 +139,10 @@ func NewHistoryReplicator(
workflowCache: workflowCache,
transactionMgr: transactionMgr,
eventsReapplier: eventsReapplier,
logger: logger,
rateLimiter: quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return shardContext.GetConfig().ReplicationTaskProcessorApplyPersistenceQPS() },
),
logger: logger,

mutableStateMapper: NewMutableStateMapping(
shardContext,
Expand Down Expand Up @@ -192,6 +197,7 @@ func (r *HistoryReplicatorImpl) ApplyEvents(
ctx context.Context,
request *historyservice.ReplicateEventsV2Request,
) (retError error) {
_ = r.rateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?

task, err := newReplicationTaskFromRequest(
r.clusterMetadata,
Expand All @@ -210,6 +216,8 @@ func (r *HistoryReplicatorImpl) BackfillHistoryEvents(
ctx context.Context,
request *historyi.BackfillHistoryEventsRequest,
) error {
_ = r.rateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?

task, err := newReplicationTaskFromBatch(
r.clusterMetadata,
r.logger,
Expand Down Expand Up @@ -404,6 +412,8 @@ func (r *HistoryReplicatorImpl) ReplicateHistoryEvents(
newEvents []*historypb.HistoryEvent,
newRunID string,
) error {
_ = r.rateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?

task, err := newReplicationTaskFromBatch(
r.clusterMetadata,
r.logger,
Expand Down
Loading