Skip to content

Commit 26deef2

Browse files
committed
Wire into FX.
1 parent 170174c commit 26deef2

File tree

5 files changed

+59
-45
lines changed

5 files changed

+59
-45
lines changed

service/history/history_engine.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ func NewEngineWithShardContext(
162162
dlqWriter replication.DLQWriter,
163163
commandHandlerRegistry *workflow.CommandHandlerRegistry,
164164
outboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool,
165+
historyReplicatorPersistenceRateLimiterProvider func(historyi.ShardContext) replication.PersistenceRateLimiter,
165166
testHooks testhooks.TestHooks,
166167
) historyi.Engine {
167168
currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
@@ -238,6 +239,7 @@ func NewEngineWithShardContext(
238239
workflowCache,
239240
historyEngImpl.eventsReapplier,
240241
eventSerializer,
242+
historyReplicatorPersistenceRateLimiterProvider(shard),
241243
logger,
242244
)
243245
historyEngImpl.nDCHistoryImporter = ndc.NewHistoryImporter(

service/history/history_engine_factory.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,27 @@ type (
2525
HistoryEngineFactoryParams struct {
2626
fx.In
2727

28-
ClientBean client.Bean
29-
MatchingClient resource.MatchingClient
30-
SdkClientFactory sdk.ClientFactory
31-
EventNotifier events.Notifier
32-
Config *configs.Config
33-
RawMatchingClient resource.MatchingRawClient
34-
WorkflowCache wcache.Cache
35-
ReplicationProgressCache replication.ProgressCache
36-
EventSerializer serialization.Serializer
37-
QueueFactories []QueueFactory `group:"queueFactory"`
38-
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
39-
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
40-
TracerProvider trace.TracerProvider
41-
PersistenceVisibilityMgr manager.VisibilityManager
42-
EventBlobCache persistence.XDCCache
43-
TaskCategoryRegistry tasks.TaskCategoryRegistry
44-
ReplicationDLQWriter replication.DLQWriter
45-
CommandHandlerRegistry *workflow.CommandHandlerRegistry
46-
OutboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool
47-
TestHooks testhooks.TestHooks
28+
ClientBean client.Bean
29+
MatchingClient resource.MatchingClient
30+
SdkClientFactory sdk.ClientFactory
31+
EventNotifier events.Notifier
32+
Config *configs.Config
33+
RawMatchingClient resource.MatchingRawClient
34+
WorkflowCache wcache.Cache
35+
ReplicationProgressCache replication.ProgressCache
36+
EventSerializer serialization.Serializer
37+
QueueFactories []QueueFactory `group:"queueFactory"`
38+
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
39+
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
40+
TracerProvider trace.TracerProvider
41+
PersistenceVisibilityMgr manager.VisibilityManager
42+
EventBlobCache persistence.XDCCache
43+
TaskCategoryRegistry tasks.TaskCategoryRegistry
44+
ReplicationDLQWriter replication.DLQWriter
45+
CommandHandlerRegistry *workflow.CommandHandlerRegistry
46+
OutboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool
47+
HistoryReplicatorPersistenceRateLimiterProvider func(historyi.ShardContext) replication.PersistenceRateLimiter
48+
TestHooks testhooks.TestHooks
4849
}
4950

5051
historyEngineFactory struct {
@@ -77,6 +78,7 @@ func (f *historyEngineFactory) CreateEngine(
7778
f.ReplicationDLQWriter,
7879
f.CommandHandlerRegistry,
7980
f.OutboundQueueCBPool,
81+
f.HistoryReplicatorPersistenceRateLimiterProvider,
8082
f.TestHooks,
8183
)
8284
}

service/history/ndc/history_replicator.go

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,16 @@ type (
9797
}
9898

9999
HistoryReplicatorImpl struct {
100-
shardContext historyi.ShardContext
101-
clusterMetadata cluster.Metadata
102-
historySerializer serialization.Serializer
103-
metricsHandler metrics.Handler
104-
namespaceRegistry namespace.Registry
105-
workflowCache wcache.Cache
106-
eventsReapplier EventsReapplier
107-
transactionMgr TransactionManager
108-
rateLimiter quotas.RateLimiter
109-
logger log.Logger
100+
shardContext historyi.ShardContext
101+
clusterMetadata cluster.Metadata
102+
historySerializer serialization.Serializer
103+
metricsHandler metrics.Handler
104+
namespaceRegistry namespace.Registry
105+
workflowCache wcache.Cache
106+
eventsReapplier EventsReapplier
107+
transactionMgr TransactionManager
108+
persistenceRateLimiter quotas.RateLimiter
109+
logger log.Logger
110110

111111
mutableStateMapper *MutableStateMapperImpl
112112
newResetter workflowResetterProvider
@@ -125,24 +125,23 @@ func NewHistoryReplicator(
125125
workflowCache wcache.Cache,
126126
eventsReapplier EventsReapplier,
127127
eventSerializer serialization.Serializer,
128+
persistenceRateLimiter quotas.RateLimiter,
128129
logger log.Logger,
129130
) *HistoryReplicatorImpl {
130131

131132
logger = log.With(logger, tag.ComponentHistoryReplicator)
132133
transactionMgr := NewTransactionManager(shardContext, workflowCache, eventsReapplier, logger, false)
133134
replicator := &HistoryReplicatorImpl{
134-
shardContext: shardContext,
135-
clusterMetadata: shardContext.GetClusterMetadata(),
136-
historySerializer: eventSerializer,
137-
metricsHandler: shardContext.GetMetricsHandler(),
138-
namespaceRegistry: shardContext.GetNamespaceRegistry(),
139-
workflowCache: workflowCache,
140-
transactionMgr: transactionMgr,
141-
eventsReapplier: eventsReapplier,
142-
rateLimiter: quotas.NewDefaultOutgoingRateLimiter(
143-
func() float64 { return shardContext.GetConfig().ReplicationTaskProcessorApplyPersistenceQPS() },
144-
),
145-
logger: logger,
135+
shardContext: shardContext,
136+
clusterMetadata: shardContext.GetClusterMetadata(),
137+
historySerializer: eventSerializer,
138+
metricsHandler: shardContext.GetMetricsHandler(),
139+
namespaceRegistry: shardContext.GetNamespaceRegistry(),
140+
workflowCache: workflowCache,
141+
transactionMgr: transactionMgr,
142+
eventsReapplier: eventsReapplier,
143+
persistenceRateLimiter: persistenceRateLimiter,
144+
logger: logger,
146145

147146
mutableStateMapper: NewMutableStateMapping(
148147
shardContext,
@@ -197,7 +196,7 @@ func (r *HistoryReplicatorImpl) ApplyEvents(
197196
ctx context.Context,
198197
request *historyservice.ReplicateEventsV2Request,
199198
) (retError error) {
200-
_ = r.rateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?
199+
_ = r.persistenceRateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?
201200

202201
task, err := newReplicationTaskFromRequest(
203202
r.clusterMetadata,
@@ -216,7 +215,7 @@ func (r *HistoryReplicatorImpl) BackfillHistoryEvents(
216215
ctx context.Context,
217216
request *historyi.BackfillHistoryEventsRequest,
218217
) error {
219-
_ = r.rateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?
218+
_ = r.persistenceRateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?
220219

221220
task, err := newReplicationTaskFromBatch(
222221
r.clusterMetadata,
@@ -412,7 +411,7 @@ func (r *HistoryReplicatorImpl) ReplicateHistoryEvents(
412411
newEvents []*historypb.HistoryEvent,
413412
newRunID string,
414413
) error {
415-
_ = r.rateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?
414+
_ = r.persistenceRateLimiter.Wait(ctx) // WaitN(ctx, tokens) based on the request events size?
416415

417416
task, err := newReplicationTaskFromBatch(
418417
r.clusterMetadata,

service/history/replication/fx.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ var Module = fx.Provide(
4141
NewExecutionManagerDLQWriter,
4242
ClientSchedulerRateLimiterProvider,
4343
ServerSchedulerRateLimiterProvider,
44+
func() func(historyi.ShardContext) PersistenceRateLimiter {
45+
return PersistenceRateLimiterProvider
46+
},
4447
replicationTaskConverterFactoryProvider,
4548
replicationTaskExecutorProvider,
4649
fx.Annotated{

service/history/replication/quotas.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package replication
22

33
import (
44
"go.temporal.io/server/common/quotas"
5+
historyi "go.temporal.io/server/service/history/interfaces"
56
)
67

78
const (
@@ -11,6 +12,7 @@ const (
1112
type (
1213
ServerSchedulerRateLimiter quotas.RequestRateLimiter
1314
ClientSchedulerRateLimiter quotas.RequestRateLimiter
15+
PersistenceRateLimiter quotas.RateLimiter
1416
)
1517

1618
func ClientSchedulerRateLimiterProvider() ClientSchedulerRateLimiter {
@@ -22,3 +24,9 @@ func ServerSchedulerRateLimiterProvider() ServerSchedulerRateLimiter {
2224
// Experiment with no op rate limiter
2325
return quotas.NoopRequestRateLimiter
2426
}
27+
28+
func PersistenceRateLimiterProvider(shardContext historyi.ShardContext) PersistenceRateLimiter {
29+
return quotas.NewDefaultOutgoingRateLimiter(
30+
func() float64 { return shardContext.GetConfig().ReplicationTaskProcessorApplyPersistenceQPS() },
31+
)
32+
}

0 commit comments

Comments
 (0)