Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2387,6 +2387,11 @@ the number of children greater than or equal to this threshold`,
30,
`ReplicationTaskProcessorShardQPS is the qps of task processing rate limiter on shard level`,
)
ReplicationTaskProcessorApplyPersistenceQPS = NewGlobalFloatSetting(
"history.ReplicationTaskProcessorApplyPersistenceQPS",
10000,
`ReplicationTaskProcessorApplyPersistenceQPS is the qps of task processing rate limiter on persistence level`,
)
ReplicationEnableDLQMetrics = NewGlobalBoolSetting(
"history.ReplicationEnableDLQMetrics",
true,
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ type Config struct {
ReplicationTaskProcessorCleanupJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter
ReplicationTaskProcessorHostQPS dynamicconfig.FloatPropertyFn
ReplicationTaskProcessorShardQPS dynamicconfig.FloatPropertyFn
ReplicationTaskProcessorApplyPersistenceQPS dynamicconfig.FloatPropertyFn
ReplicationEnableDLQMetrics dynamicconfig.BoolPropertyFn
ReplicationEnableUpdateWithNewTaskMerge dynamicconfig.BoolPropertyFn
ReplicationMultipleBatches dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -556,6 +557,7 @@ func NewConfig(
ReplicatorProcessorMaxSkipTaskCount: dynamicconfig.ReplicatorMaxSkipTaskCount.Get(dc),
ReplicationTaskProcessorHostQPS: dynamicconfig.ReplicationTaskProcessorHostQPS.Get(dc),
ReplicationTaskProcessorShardQPS: dynamicconfig.ReplicationTaskProcessorShardQPS.Get(dc),
ReplicationTaskProcessorApplyPersistenceQPS: dynamicconfig.ReplicationTaskProcessorApplyPersistenceQPS.Get(dc),
ReplicationEnableDLQMetrics: dynamicconfig.ReplicationEnableDLQMetrics.Get(dc),
ReplicationEnableUpdateWithNewTaskMerge: dynamicconfig.ReplicationEnableUpdateWithNewTaskMerge.Get(dc),
ReplicationStreamSyncStatusDuration: dynamicconfig.ReplicationStreamSyncStatusDuration.Get(dc),
Expand Down
2 changes: 2 additions & 0 deletions service/history/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func NewEngineWithShardContext(
dlqWriter replication.DLQWriter,
commandHandlerRegistry *workflow.CommandHandlerRegistry,
outboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool,
historyReplicatorPersistenceRateLimiterProvider func(historyi.ShardContext) replication.PersistenceRateLimiter,
testHooks testhooks.TestHooks,
) historyi.Engine {
currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
Expand Down Expand Up @@ -238,6 +239,7 @@ func NewEngineWithShardContext(
workflowCache,
historyEngImpl.eventsReapplier,
eventSerializer,
historyReplicatorPersistenceRateLimiterProvider(shard),
logger,
)
historyEngImpl.nDCHistoryImporter = ndc.NewHistoryImporter(
Expand Down
42 changes: 22 additions & 20 deletions service/history/history_engine_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,27 @@ type (
HistoryEngineFactoryParams struct {
fx.In

ClientBean client.Bean
MatchingClient resource.MatchingClient
SdkClientFactory sdk.ClientFactory
EventNotifier events.Notifier
Config *configs.Config
RawMatchingClient resource.MatchingRawClient
WorkflowCache wcache.Cache
ReplicationProgressCache replication.ProgressCache
EventSerializer serialization.Serializer
QueueFactories []QueueFactory `group:"queueFactory"`
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
TracerProvider trace.TracerProvider
PersistenceVisibilityMgr manager.VisibilityManager
EventBlobCache persistence.XDCCache
TaskCategoryRegistry tasks.TaskCategoryRegistry
ReplicationDLQWriter replication.DLQWriter
CommandHandlerRegistry *workflow.CommandHandlerRegistry
OutboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool
TestHooks testhooks.TestHooks
ClientBean client.Bean
MatchingClient resource.MatchingClient
SdkClientFactory sdk.ClientFactory
EventNotifier events.Notifier
Config *configs.Config
RawMatchingClient resource.MatchingRawClient
WorkflowCache wcache.Cache
ReplicationProgressCache replication.ProgressCache
EventSerializer serialization.Serializer
QueueFactories []QueueFactory `group:"queueFactory"`
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
TracerProvider trace.TracerProvider
PersistenceVisibilityMgr manager.VisibilityManager
EventBlobCache persistence.XDCCache
TaskCategoryRegistry tasks.TaskCategoryRegistry
ReplicationDLQWriter replication.DLQWriter
CommandHandlerRegistry *workflow.CommandHandlerRegistry
OutboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool
HistoryReplicatorPersistenceRateLimiterProvider func(historyi.ShardContext) replication.PersistenceRateLimiter
TestHooks testhooks.TestHooks
}

historyEngineFactory struct {
Expand Down Expand Up @@ -77,6 +78,7 @@ func (f *historyEngineFactory) CreateEngine(
f.ReplicationDLQWriter,
f.CommandHandlerRegistry,
f.OutboundQueueCBPool,
f.HistoryReplicatorPersistenceRateLimiterProvider,
f.TestHooks,
)
}
45 changes: 27 additions & 18 deletions service/history/ndc/history_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.temporal.io/server/common/persistence/transitionhistory"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/service/history/consts"
historyi "go.temporal.io/server/service/history/interfaces"
Expand Down Expand Up @@ -96,15 +97,16 @@ type (
}

HistoryReplicatorImpl struct {
shardContext historyi.ShardContext
clusterMetadata cluster.Metadata
historySerializer serialization.Serializer
metricsHandler metrics.Handler
namespaceRegistry namespace.Registry
workflowCache wcache.Cache
eventsReapplier EventsReapplier
transactionMgr TransactionManager
logger log.Logger
shardContext historyi.ShardContext
clusterMetadata cluster.Metadata
historySerializer serialization.Serializer
metricsHandler metrics.Handler
namespaceRegistry namespace.Registry
workflowCache wcache.Cache
eventsReapplier EventsReapplier
transactionMgr TransactionManager
persistenceRateLimiter quotas.RateLimiter
logger log.Logger

mutableStateMapper *MutableStateMapperImpl
newResetter workflowResetterProvider
Expand All @@ -123,21 +125,23 @@ func NewHistoryReplicator(
workflowCache wcache.Cache,
eventsReapplier EventsReapplier,
eventSerializer serialization.Serializer,
persistenceRateLimiter quotas.RateLimiter,
logger log.Logger,
) *HistoryReplicatorImpl {

logger = log.With(logger, tag.ComponentHistoryReplicator)
transactionMgr := NewTransactionManager(shardContext, workflowCache, eventsReapplier, logger, false)
replicator := &HistoryReplicatorImpl{
shardContext: shardContext,
clusterMetadata: shardContext.GetClusterMetadata(),
historySerializer: eventSerializer,
metricsHandler: shardContext.GetMetricsHandler(),
namespaceRegistry: shardContext.GetNamespaceRegistry(),
workflowCache: workflowCache,
transactionMgr: transactionMgr,
eventsReapplier: eventsReapplier,
logger: logger,
shardContext: shardContext,
clusterMetadata: shardContext.GetClusterMetadata(),
historySerializer: eventSerializer,
metricsHandler: shardContext.GetMetricsHandler(),
namespaceRegistry: shardContext.GetNamespaceRegistry(),
workflowCache: workflowCache,
transactionMgr: transactionMgr,
eventsReapplier: eventsReapplier,
persistenceRateLimiter: persistenceRateLimiter,
logger: logger,

mutableStateMapper: NewMutableStateMapping(
shardContext,
Expand Down Expand Up @@ -192,6 +196,7 @@ func (r *HistoryReplicatorImpl) ApplyEvents(
ctx context.Context,
request *historyservice.ReplicateEventsV2Request,
) (retError error) {
_ = r.persistenceRateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?

task, err := newReplicationTaskFromRequest(
r.clusterMetadata,
Expand All @@ -210,6 +215,8 @@ func (r *HistoryReplicatorImpl) BackfillHistoryEvents(
ctx context.Context,
request *historyi.BackfillHistoryEventsRequest,
) error {
_ = r.persistenceRateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?

task, err := newReplicationTaskFromBatch(
r.clusterMetadata,
r.logger,
Expand Down Expand Up @@ -404,6 +411,8 @@ func (r *HistoryReplicatorImpl) ReplicateHistoryEvents(
newEvents []*historypb.HistoryEvent,
newRunID string,
) error {
_ = r.persistenceRateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?

task, err := newReplicationTaskFromBatch(
r.clusterMetadata,
r.logger,
Expand Down
3 changes: 3 additions & 0 deletions service/history/replication/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ var Module = fx.Provide(
NewExecutionManagerDLQWriter,
ClientSchedulerRateLimiterProvider,
ServerSchedulerRateLimiterProvider,
func() func(historyi.ShardContext) PersistenceRateLimiter {
return PersistenceRateLimiterProvider
},
replicationTaskConverterFactoryProvider,
replicationTaskExecutorProvider,
fx.Annotated{
Expand Down
8 changes: 8 additions & 0 deletions service/history/replication/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package replication

import (
"go.temporal.io/server/common/quotas"
historyi "go.temporal.io/server/service/history/interfaces"
)

const (
Expand All @@ -11,6 +12,7 @@ const (
type (
ServerSchedulerRateLimiter quotas.RequestRateLimiter
ClientSchedulerRateLimiter quotas.RequestRateLimiter
PersistenceRateLimiter quotas.RateLimiter
)

func ClientSchedulerRateLimiterProvider() ClientSchedulerRateLimiter {
Expand All @@ -22,3 +24,9 @@ func ServerSchedulerRateLimiterProvider() ServerSchedulerRateLimiter {
// Experiment with no op rate limiter
return quotas.NoopRequestRateLimiter
}

func PersistenceRateLimiterProvider(shardContext historyi.ShardContext) PersistenceRateLimiter {
return quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return shardContext.GetConfig().ReplicationTaskProcessorApplyPersistenceQPS() },
)
}
Loading