diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a450992f2ba..378a3cb0501 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2387,6 +2387,11 @@ the number of children greater than or equal to this threshold`, 30, `ReplicationTaskProcessorShardQPS is the qps of task processing rate limiter on shard level`, ) + ReplicationTaskProcessorApplyPersistenceQPS = NewGlobalFloatSetting( + "history.ReplicationTaskProcessorApplyPersistenceQPS", + 10000, + `ReplicationTaskProcessorApplyPersistenceQPS is the qps of task processing rate limiter on persistence level`, + ) ReplicationEnableDLQMetrics = NewGlobalBoolSetting( "history.ReplicationEnableDLQMetrics", true, diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 270ef7c02ed..a3979d20fae 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -266,6 +266,7 @@ type Config struct { ReplicationTaskProcessorCleanupJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter ReplicationTaskProcessorHostQPS dynamicconfig.FloatPropertyFn ReplicationTaskProcessorShardQPS dynamicconfig.FloatPropertyFn + ReplicationTaskProcessorApplyPersistenceQPS dynamicconfig.FloatPropertyFn ReplicationEnableDLQMetrics dynamicconfig.BoolPropertyFn ReplicationEnableUpdateWithNewTaskMerge dynamicconfig.BoolPropertyFn ReplicationMultipleBatches dynamicconfig.BoolPropertyFn @@ -556,6 +557,7 @@ func NewConfig( ReplicatorProcessorMaxSkipTaskCount: dynamicconfig.ReplicatorMaxSkipTaskCount.Get(dc), ReplicationTaskProcessorHostQPS: dynamicconfig.ReplicationTaskProcessorHostQPS.Get(dc), ReplicationTaskProcessorShardQPS: dynamicconfig.ReplicationTaskProcessorShardQPS.Get(dc), + ReplicationTaskProcessorApplyPersistenceQPS: dynamicconfig.ReplicationTaskProcessorApplyPersistenceQPS.Get(dc), ReplicationEnableDLQMetrics: dynamicconfig.ReplicationEnableDLQMetrics.Get(dc), ReplicationEnableUpdateWithNewTaskMerge: dynamicconfig.ReplicationEnableUpdateWithNewTaskMerge.Get(dc), ReplicationStreamSyncStatusDuration: dynamicconfig.ReplicationStreamSyncStatusDuration.Get(dc), diff --git a/service/history/history_engine.go b/service/history/history_engine.go index 0d0978feab8..0ffd3dd82ac 100644 --- a/service/history/history_engine.go +++ b/service/history/history_engine.go @@ -162,6 +162,7 @@ func NewEngineWithShardContext( dlqWriter replication.DLQWriter, commandHandlerRegistry *workflow.CommandHandlerRegistry, outboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool, + historyReplicatorPersistenceRateLimiterProvider func(historyi.ShardContext) replication.PersistenceRateLimiter, testHooks testhooks.TestHooks, ) historyi.Engine { currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName() @@ -238,6 +239,7 @@ func NewEngineWithShardContext( workflowCache, historyEngImpl.eventsReapplier, eventSerializer, + historyReplicatorPersistenceRateLimiterProvider(shard), logger, ) historyEngImpl.nDCHistoryImporter = ndc.NewHistoryImporter( diff --git a/service/history/history_engine_factory.go b/service/history/history_engine_factory.go index a0e6f2809e8..4e56010c677 100644 --- a/service/history/history_engine_factory.go +++ b/service/history/history_engine_factory.go @@ -25,26 +25,27 @@ type ( HistoryEngineFactoryParams struct { fx.In - ClientBean client.Bean - MatchingClient resource.MatchingClient - SdkClientFactory sdk.ClientFactory - EventNotifier events.Notifier - Config *configs.Config - RawMatchingClient resource.MatchingRawClient - WorkflowCache wcache.Cache - ReplicationProgressCache replication.ProgressCache - EventSerializer serialization.Serializer - QueueFactories []QueueFactory `group:"queueFactory"` - ReplicationTaskFetcherFactory replication.TaskFetcherFactory - ReplicationTaskExecutorProvider replication.TaskExecutorProvider - TracerProvider trace.TracerProvider - PersistenceVisibilityMgr manager.VisibilityManager - EventBlobCache persistence.XDCCache - TaskCategoryRegistry tasks.TaskCategoryRegistry - ReplicationDLQWriter replication.DLQWriter - CommandHandlerRegistry *workflow.CommandHandlerRegistry - OutboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool - TestHooks testhooks.TestHooks + ClientBean client.Bean + MatchingClient resource.MatchingClient + SdkClientFactory sdk.ClientFactory + EventNotifier events.Notifier + Config *configs.Config + RawMatchingClient resource.MatchingRawClient + WorkflowCache wcache.Cache + ReplicationProgressCache replication.ProgressCache + EventSerializer serialization.Serializer + QueueFactories []QueueFactory `group:"queueFactory"` + ReplicationTaskFetcherFactory replication.TaskFetcherFactory + ReplicationTaskExecutorProvider replication.TaskExecutorProvider + TracerProvider trace.TracerProvider + PersistenceVisibilityMgr manager.VisibilityManager + EventBlobCache persistence.XDCCache + TaskCategoryRegistry tasks.TaskCategoryRegistry + ReplicationDLQWriter replication.DLQWriter + CommandHandlerRegistry *workflow.CommandHandlerRegistry + OutboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool + HistoryReplicatorPersistenceRateLimiterProvider func(historyi.ShardContext) replication.PersistenceRateLimiter + TestHooks testhooks.TestHooks } historyEngineFactory struct { @@ -77,6 +78,7 @@ func (f *historyEngineFactory) CreateEngine( f.ReplicationDLQWriter, f.CommandHandlerRegistry, f.OutboundQueueCBPool, + f.HistoryReplicatorPersistenceRateLimiterProvider, f.TestHooks, ) } diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 25164aed05b..b25144f0e61 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -25,6 +25,7 @@ import ( "go.temporal.io/server/common/persistence/transitionhistory" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/quotas" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/consts" historyi "go.temporal.io/server/service/history/interfaces" @@ -96,15 +97,16 @@ type ( } HistoryReplicatorImpl struct { - shardContext historyi.ShardContext - clusterMetadata cluster.Metadata - historySerializer serialization.Serializer - metricsHandler metrics.Handler - namespaceRegistry namespace.Registry - workflowCache wcache.Cache - eventsReapplier EventsReapplier - transactionMgr TransactionManager - logger log.Logger + shardContext historyi.ShardContext + clusterMetadata cluster.Metadata + historySerializer serialization.Serializer + metricsHandler metrics.Handler + namespaceRegistry namespace.Registry + workflowCache wcache.Cache + eventsReapplier EventsReapplier + transactionMgr TransactionManager + persistenceRateLimiter quotas.RateLimiter + logger log.Logger mutableStateMapper *MutableStateMapperImpl newResetter workflowResetterProvider @@ -123,21 +125,23 @@ func NewHistoryReplicator( workflowCache wcache.Cache, eventsReapplier EventsReapplier, eventSerializer serialization.Serializer, + persistenceRateLimiter quotas.RateLimiter, logger log.Logger, ) *HistoryReplicatorImpl { logger = log.With(logger, tag.ComponentHistoryReplicator) transactionMgr := NewTransactionManager(shardContext, workflowCache, eventsReapplier, logger, false) replicator := &HistoryReplicatorImpl{ - shardContext: shardContext, - clusterMetadata: shardContext.GetClusterMetadata(), - historySerializer: eventSerializer, - metricsHandler: shardContext.GetMetricsHandler(), - namespaceRegistry: shardContext.GetNamespaceRegistry(), - workflowCache: workflowCache, - transactionMgr: transactionMgr, - eventsReapplier: eventsReapplier, - logger: logger, + shardContext: shardContext, + clusterMetadata: shardContext.GetClusterMetadata(), + historySerializer: eventSerializer, + metricsHandler: shardContext.GetMetricsHandler(), + namespaceRegistry: shardContext.GetNamespaceRegistry(), + workflowCache: workflowCache, + transactionMgr: transactionMgr, + eventsReapplier: eventsReapplier, + persistenceRateLimiter: persistenceRateLimiter, + logger: logger, mutableStateMapper: NewMutableStateMapping( shardContext, @@ -192,6 +196,7 @@ func (r *HistoryReplicatorImpl) ApplyEvents( ctx context.Context, request *historyservice.ReplicateEventsV2Request, ) (retError error) { + _ = r.persistenceRateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size? task, err := newReplicationTaskFromRequest( r.clusterMetadata, @@ -210,6 +215,8 @@ func (r *HistoryReplicatorImpl) BackfillHistoryEvents( ctx context.Context, request *historyi.BackfillHistoryEventsRequest, ) error { + _ = r.persistenceRateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size? + task, err := newReplicationTaskFromBatch( r.clusterMetadata, r.logger, @@ -404,6 +411,8 @@ func (r *HistoryReplicatorImpl) ReplicateHistoryEvents( newEvents []*historypb.HistoryEvent, newRunID string, ) error { + _ = r.persistenceRateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size? + task, err := newReplicationTaskFromBatch( r.clusterMetadata, r.logger, diff --git a/service/history/replication/fx.go b/service/history/replication/fx.go index e082fbd9c4d..ca18a790d67 100644 --- a/service/history/replication/fx.go +++ b/service/history/replication/fx.go @@ -41,6 +41,9 @@ var Module = fx.Provide( NewExecutionManagerDLQWriter, ClientSchedulerRateLimiterProvider, ServerSchedulerRateLimiterProvider, + func() func(historyi.ShardContext) PersistenceRateLimiter { + return PersistenceRateLimiterProvider + }, replicationTaskConverterFactoryProvider, replicationTaskExecutorProvider, fx.Annotated{ diff --git a/service/history/replication/quotas.go b/service/history/replication/quotas.go index 98b41ac83a9..3d8fd90ee10 100644 --- a/service/history/replication/quotas.go +++ b/service/history/replication/quotas.go @@ -2,6 +2,7 @@ package replication import ( "go.temporal.io/server/common/quotas" + historyi "go.temporal.io/server/service/history/interfaces" ) const ( @@ -11,6 +12,7 @@ const ( type ( ServerSchedulerRateLimiter quotas.RequestRateLimiter ClientSchedulerRateLimiter quotas.RequestRateLimiter + PersistenceRateLimiter quotas.RateLimiter ) func ClientSchedulerRateLimiterProvider() ClientSchedulerRateLimiter { @@ -22,3 +24,9 @@ func ServerSchedulerRateLimiterProvider() ServerSchedulerRateLimiter { // Experiment with no op rate limiter return quotas.NoopRequestRateLimiter } + +func PersistenceRateLimiterProvider(shardContext historyi.ShardContext) PersistenceRateLimiter { + return quotas.NewDefaultOutgoingRateLimiter( + func() float64 { return shardContext.GetConfig().ReplicationTaskProcessorApplyPersistenceQPS() }, + ) +}