Skip to content

Commit 4e14c2d

Browse files
yux0alexshtin
authored andcommitted
Adding replication task processing metrics (#3452)
1 parent a8e0f07 commit 4e14c2d

File tree

3 files changed

+20
-11
lines changed

3 files changed

+20
-11
lines changed

common/metrics/defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1801,6 +1801,7 @@ const (
18011801
ReplicationTasksLag
18021802
ReplicationTasksFetched
18031803
ReplicationTasksReturned
1804+
ReplicationTasksAppliedLatency
18041805
ReplicationDLQFailed
18051806
ReplicationDLQMaxLevelGauge
18061807
ReplicationDLQAckLevelGauge
@@ -2216,6 +2217,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
22162217
ReplicationTasksLag: {metricName: "replication_tasks_lag", metricType: Timer},
22172218
ReplicationTasksFetched: {metricName: "replication_tasks_fetched", metricType: Timer},
22182219
ReplicationTasksReturned: {metricName: "replication_tasks_returned", metricType: Timer},
2220+
ReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency", metricType: Timer},
22192221
ReplicationDLQFailed: {metricName: "replication_dlq_enqueue_failed", metricType: Counter},
22202222
ReplicationDLQMaxLevelGauge: {metricName: "replication_dlq_max_level", metricType: Gauge},
22212223
ReplicationDLQAckLevelGauge: {metricName: "replication_dlq_ack_level", metricType: Gauge},

service/history/replicationTaskExecutor.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ func (e *replicationTaskExecutorImpl) handleActivityTask(
119119
return err
120120
}
121121

122+
replicationStopWatch := e.metricsClient.StartTimer(metrics.SyncActivityTaskScope, metrics.ServiceLatency)
123+
defer replicationStopWatch.Stop()
122124
request := &historyservice.SyncActivityRequest{
123125
NamespaceId: attr.NamespaceId,
124126
WorkflowId: attr.WorkflowId,
@@ -177,6 +179,8 @@ func (e *replicationTaskExecutorImpl) handleHistoryReplicationTaskV2(
177179
return err
178180
}
179181

182+
replicationStopWatch := e.metricsClient.StartTimer(metrics.HistoryReplicationV2TaskScope, metrics.ServiceLatency)
183+
defer replicationStopWatch.Stop()
180184
request := &historyservice.ReplicateEventsV2Request{
181185
NamespaceId: attr.NamespaceId,
182186
WorkflowExecution: &commonpb.WorkflowExecution{
@@ -197,8 +201,8 @@ func (e *replicationTaskExecutorImpl) handleHistoryReplicationTaskV2(
197201
return err
198202
}
199203
e.metricsClient.IncCounter(metrics.HistoryRereplicationByHistoryReplicationScope, metrics.ClientRequests)
200-
stopwatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByHistoryReplicationScope, metrics.ClientLatency)
201-
defer stopwatch.Stop()
204+
resendStopWatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByHistoryReplicationScope, metrics.ClientLatency)
205+
defer resendStopWatch.Stop()
202206

203207
if resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
204208
retryErr.NamespaceId,

service/history/replicationTaskProcessor.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,8 @@ func (p *ReplicationTaskProcessorImpl) sendFetchMessageRequest() <-chan *replica
281281
func (p *ReplicationTaskProcessorImpl) processResponse(response *replicationspb.ReplicationMessages) {
282282

283283
p.syncShardChan <- response.GetSyncShardStatus()
284-
// Note here we check replication tasks instead of hasMore. The expectation is that in a steady state
285-
// we will receive replication tasks but hasMore is false (meaning that we are always catching up).
286-
// So hasMore might not be a good indicator for additional wait.
287-
if len(response.ReplicationTasks) == 0 {
288-
backoffDuration := p.noTaskRetrier.NextBackOff()
289-
time.Sleep(backoffDuration)
290-
return
291-
}
284+
scope := p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope, metrics.TargetClusterTag(p.sourceCluster))
285+
batchRequestStartTime := time.Now()
292286

293287
for _, replicationTask := range response.ReplicationTasks {
294288
err := p.processSingleTask(replicationTask)
@@ -298,9 +292,18 @@ func (p *ReplicationTaskProcessorImpl) processResponse(response *replicationspb.
298292
}
299293
}
300294

295+
// Note here we check replication tasks instead of hasMore. The expectation is that in a steady state
296+
// we will receive replication tasks but hasMore is false (meaning that we are always catching up).
297+
// So hasMore might not be a good indicator for additional wait.
298+
if len(response.ReplicationTasks) == 0 {
299+
backoffDuration := p.noTaskRetrier.NextBackOff()
300+
time.Sleep(backoffDuration)
301+
} else {
302+
scope.RecordTimer(metrics.ReplicationTasksAppliedLatency, time.Now().Sub(batchRequestStartTime))
303+
}
304+
301305
p.lastProcessedMessageID = response.GetLastRetrievedMessageId()
302306
p.lastRetrievedMessageID = response.GetLastRetrievedMessageId()
303-
scope := p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope, metrics.TargetClusterTag(p.sourceCluster))
304307
scope.UpdateGauge(metrics.LastRetrievedMessageID, float64(p.lastRetrievedMessageID))
305308
p.noTaskRetrier.Reset()
306309
}

0 commit comments

Comments
 (0)