@@ -44,7 +44,7 @@ const (
4444type (
4545 HealthSignalAggregator interface {
4646 common.Daemon
47- Record (callerSegment int32 , latency time.Duration , err error )
47+ Record (callerSegment int32 , namespace string , latency time.Duration , err error )
4848 AverageLatency () float64
4949 ErrorRatio () float64
5050 }
@@ -53,16 +53,18 @@ type (
5353 status int32
5454 shutdownCh chan struct {}
5555
56- requestsPerShard map [int32 ]int64
57- requestsLock sync.Mutex
56+ // map of shardID -> map of namespace -> request count
57+ requestCounts map [int32 ]map [string ]int64
58+ requestsLock sync.Mutex
5859
5960 aggregationEnabled bool
6061 latencyAverage aggregate.MovingWindowAverage
6162 errorRatio aggregate.MovingWindowAverage
6263
63- metricsHandler metrics.Handler
64- emitMetricsTimer * time.Ticker
65- perShardRPSWarnLimit dynamicconfig.IntPropertyFn
64+ metricsHandler metrics.Handler
65+ emitMetricsTimer * time.Ticker
66+ perShardRPSWarnLimit dynamicconfig.IntPropertyFn
67+ perShardPerNsRPSWarnLimit dynamicconfig.FloatPropertyFn
6668
6769 logger log.Logger
6870 }
@@ -74,17 +76,19 @@ func NewHealthSignalAggregatorImpl(
7476 maxBufferSize int ,
7577 metricsHandler metrics.Handler ,
7678 perShardRPSWarnLimit dynamicconfig.IntPropertyFn ,
79+ perShardPerNsRPSWarnLimit dynamicconfig.FloatPropertyFn ,
7780 logger log.Logger ,
7881) * HealthSignalAggregatorImpl {
7982 ret := & HealthSignalAggregatorImpl {
80- status : common .DaemonStatusInitialized ,
81- shutdownCh : make (chan struct {}),
82- requestsPerShard : make (map [int32 ]int64 ),
83- metricsHandler : metricsHandler ,
84- emitMetricsTimer : time .NewTicker (emitMetricsInterval ),
85- perShardRPSWarnLimit : perShardRPSWarnLimit ,
86- logger : logger ,
87- aggregationEnabled : aggregationEnabled ,
83+ status : common .DaemonStatusInitialized ,
84+ shutdownCh : make (chan struct {}),
85+ requestCounts : make (map [int32 ]map [string ]int64 ),
86+ metricsHandler : metricsHandler ,
87+ emitMetricsTimer : time .NewTicker (emitMetricsInterval ),
88+ perShardRPSWarnLimit : perShardRPSWarnLimit ,
89+ perShardPerNsRPSWarnLimit : perShardPerNsRPSWarnLimit ,
90+ logger : logger ,
91+ aggregationEnabled : aggregationEnabled ,
8892 }
8993
9094 if aggregationEnabled {
@@ -113,7 +117,7 @@ func (s *HealthSignalAggregatorImpl) Stop() {
113117 s .emitMetricsTimer .Stop ()
114118}
115119
116- func (s * HealthSignalAggregatorImpl ) Record (callerSegment int32 , latency time.Duration , err error ) {
120+ func (s * HealthSignalAggregatorImpl ) Record (callerSegment int32 , namespace string , latency time.Duration , err error ) {
117121 if s .aggregationEnabled {
118122 s .latencyAverage .Record (latency .Milliseconds ())
119123
@@ -125,7 +129,7 @@ func (s *HealthSignalAggregatorImpl) Record(callerSegment int32, latency time.Du
125129 }
126130
127131 if callerSegment != CallerSegmentMissing {
128- s .incrementShardRequestCount (callerSegment )
132+ s .incrementShardRequestCount (callerSegment , namespace )
129133 }
130134}
131135
@@ -137,10 +141,13 @@ func (s *HealthSignalAggregatorImpl) ErrorRatio() float64 {
137141 return s .errorRatio .Average ()
138142}
139143
140- func (s * HealthSignalAggregatorImpl ) incrementShardRequestCount (shardID int32 ) {
144+ func (s * HealthSignalAggregatorImpl ) incrementShardRequestCount (shardID int32 , namespace string ) {
141145 s .requestsLock .Lock ()
142146 defer s .requestsLock .Unlock ()
143- s .requestsPerShard [shardID ]++
147+ if s .requestCounts [shardID ] == nil {
148+ s .requestCounts [shardID ] = make (map [string ]int64 )
149+ }
150+ s.requestCounts [shardID ][namespace ]++
144151}
145152
146153func (s * HealthSignalAggregatorImpl ) emitMetricsLoop () {
@@ -150,15 +157,24 @@ func (s *HealthSignalAggregatorImpl) emitMetricsLoop() {
150157 return
151158 case <- s .emitMetricsTimer .C :
152159 s .requestsLock .Lock ()
153- requestCounts := s .requestsPerShard
154- s .requestsPerShard = make (map [int32 ]int64 , len (requestCounts ))
160+ requestCounts := s .requestCounts
161+ s .requestCounts = make (map [int32 ] map [ string ]int64 , len (requestCounts ))
155162 s .requestsLock .Unlock ()
156163
157- for shardID , count := range requestCounts {
158- shardRPS := int64 (float64 (count ) / emitMetricsInterval .Seconds ())
164+ for shardID , requestCountPerNS := range requestCounts {
165+ shardRequestCount := int64 (0 )
166+ for namespace , count := range requestCountPerNS {
167+ shardRequestCount += count
168+ shardRPSPerNS := int64 (float64 (count ) / emitMetricsInterval .Seconds ())
169+ if s .perShardPerNsRPSWarnLimit () > 0.0 && shardRPSPerNS > int64 (s .perShardPerNsRPSWarnLimit ()* float64 (s .perShardRPSWarnLimit ())) {
170+ s .logger .Warn ("Per shard per namespace RPS warn limit exceeded" , tag .ShardID (shardID ), tag .WorkflowNamespace (namespace ), tag .RPS (shardRPSPerNS ))
171+ }
172+ }
173+
174+ shardRPS := int64 (float64 (shardRequestCount ) / emitMetricsInterval .Seconds ())
159175 s .metricsHandler .Histogram (metrics .PersistenceShardRPS .GetMetricName (), metrics .PersistenceShardRPS .GetMetricUnit ()).Record (shardRPS )
160176 if shardRPS > int64 (s .perShardRPSWarnLimit ()) {
161- s .logger .Warn ("Per shard RPS warn limit exceeded" , tag .ShardID (shardID ))
177+ s .logger .Warn ("Per shard RPS warn limit exceeded" , tag .ShardID (shardID ), tag . RPS ( shardRPS ) )
162178 }
163179 }
164180 }
0 commit comments