Skip to content

Commit 5bd927c

Browse files
authored
Record various activity metrics (#8196)
## What changed? Changes activity metrics ### Deprecated Metrics - **activity_e2e_latency** → Deprecated, as it measures individual attempts rather than true end-to-end latency, and replaced with clearer **activity_start_to_close_latency** ### New & Replacement Metrics - **activity_start_to_close_latency**: Measures the latency from activity start to close (per attempt). - **activity_schedule_to_close_latency**: Measures true end-to-end duration, including retries and backoff. - **activity_success**: Counts the number of succeeded activities. Aligned with the existing **workflow_success** counter. - **activity_fail**: Counts final failures for activities. Similar to **workflow_failed,** although it doesn’t include retries. - **activity_timeout**: Incremented on the final activity timeout (including ScheduleToStartTimeout), tagged by `timeout_type`. Aligned with the existing **workflow_timeout** counter. - **activity_task_fail**: Counts failures for activities including retries. Note that we don’t need to capture the number of retries, as this metric represents this number well. - **activity_task_timeout**: Incremented on the activity attempt timeout (including ScheduleToStartTimeout), tagged by `timeout_type`. - **activity_cancel:** Incremented when an activity is cancelled **** ## Why? ActivityE2ELatency is inaccurate, since it measures only individual attempts. The other metrics are not recorded yet. ## How did you test it? - [X] built - [X] run locally and tested manually - [X] covered by existing tests - [X] added new unit test(s) - [ ] added new functional test(s) Ran the server locally and checked the metrics in Prometheus: Test 1: Activity with 5 attempts (1s each, 1s gap, all time out) - activity_task_timeout = 5 - activity_timeout = 1 - activity_success = 0 - activity_fail = 0 (counted as timeout instead) - activity_task_fail = 0 (counted as timeout instead) - activity_schedule_to_close_latency_sum = 9s - activity_end_to_end_latency_sum = 1s (deprecated) - activity_start_to_close_latency_sum = 1s Test 2: Basic helloworld activity - activity_success = 1 Test 3: Activity with 5 attempts (1s each, 1s gap, all fail except for the last one) - activity_task_fail = 4 - activity_success = 1
1 parent 32ca054 commit 5bd927c

File tree

8 files changed

+214
-46
lines changed

8 files changed

+214
-46
lines changed

common/metrics/metric_defs.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -826,12 +826,29 @@ var (
826826
"pending_tasks",
827827
WithDescription("A histogram across history shards for the number of in-memory pending history tasks."),
828828
)
829-
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
830-
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
831-
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
832-
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
833-
QueueActionCounter = NewCounterDef("queue_actions")
834-
ActivityE2ELatency = NewTimerDef("activity_end_to_end_latency")
829+
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
830+
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
831+
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
832+
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
833+
QueueActionCounter = NewCounterDef("queue_actions")
834+
ActivityE2ELatency = NewTimerDef(
835+
"activity_end_to_end_latency",
836+
WithDescription("DEPRECATED: Will be removed in one of the next releases. Duration of an activity attempt. Use activity_start_to_close_latency instead."),
837+
)
838+
ActivityStartToCloseLatency = NewTimerDef(
839+
"activity_start_to_close_latency",
840+
WithDescription("Duration of a single activity attempt. Doesn't include retries or backoffs."),
841+
)
842+
ActivityScheduleToCloseLatency = NewTimerDef(
843+
"activity_schedule_to_close_latency",
844+
WithDescription("Duration of activity execution from scheduled time to terminal state. Includes retries and backoffs."),
845+
)
846+
ActivitySuccess = NewCounterDef("activity_success", WithDescription("Number of activities that succeeded (doesn't include retries)."))
847+
ActivityFail = NewCounterDef("activity_fail", WithDescription("Number of activities that failed and won't be retried anymore."))
848+
ActivityTaskFail = NewCounterDef("activity_task_fail", WithDescription("Number of activity task failures (includes retries)."))
849+
ActivityCancel = NewCounterDef("activity_cancel")
850+
ActivityTaskTimeout = NewCounterDef("activity_task_timeout", WithDescription("Number of activity task timeouts (including retries)."))
851+
ActivityTimeout = NewCounterDef("activity_timeout", WithDescription("Number of terminal activity timeouts."))
835852
AckLevelUpdateCounter = NewCounterDef("ack_level_update")
836853
AckLevelUpdateFailedCounter = NewCounterDef("ack_level_update_failed")
837854
CommandCounter = NewCounterDef("command")

service/history/api/respondactivitytaskcanceled/api.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func Invoke(
3838
return nil, err
3939
}
4040

41-
var activityStartedTime time.Time
41+
var attemptStartedTime time.Time
42+
var firstScheduledTime time.Time
4243
var taskQueue string
4344
var workflowTypeName string
4445
err = api.GetAndUpdateWorkflowWithNew(
@@ -96,7 +97,8 @@ func Invoke(
9697
return nil, err
9798
}
9899

99-
activityStartedTime = ai.StartedTime.AsTime()
100+
attemptStartedTime = ai.StartedTime.AsTime()
101+
firstScheduledTime = ai.FirstScheduledTime.AsTime()
100102
taskQueue = ai.TaskQueue
101103
return &api.UpdateWorkflowAction{
102104
Noop: false,
@@ -108,15 +110,21 @@ func Invoke(
108110
workflowConsistencyChecker,
109111
)
110112

111-
if err == nil && !activityStartedTime.IsZero() {
112-
metrics.ActivityE2ELatency.With(
113-
workflow.GetPerTaskQueueFamilyScope(
114-
shard.GetMetricsHandler(), namespace, taskQueue, shard.GetConfig(),
115-
metrics.OperationTag(metrics.HistoryRespondActivityTaskCanceledScope),
116-
metrics.WorkflowTypeTag(workflowTypeName),
117-
metrics.ActivityTypeTag(token.ActivityType),
118-
),
119-
).Record(time.Since(activityStartedTime))
113+
if err == nil {
114+
workflow.RecordActivityCompletionMetrics(
115+
shard,
116+
namespace,
117+
taskQueue,
118+
workflow.ActivityCompletionMetrics{
119+
Status: workflow.ActivityStatusCanceled,
120+
AttemptStartedTime: attemptStartedTime,
121+
FirstScheduledTime: firstScheduledTime,
122+
Closed: true,
123+
},
124+
metrics.OperationTag(metrics.HistoryRespondActivityTaskCanceledScope),
125+
metrics.WorkflowTypeTag(workflowTypeName),
126+
metrics.ActivityTypeTag(token.ActivityType),
127+
)
120128
}
121129
return &historyservice.RespondActivityTaskCanceledResponse{}, err
122130
}

service/history/api/respondactivitytaskcompleted/api.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func Invoke(
3838
return nil, err
3939
}
4040

41-
var activityStartedTime time.Time
41+
var attemptStartedTime time.Time
42+
var firstScheduledTime time.Time
4243
var taskQueue string
4344
var workflowTypeName string
4445
var fabricateStartedEvent bool
@@ -109,7 +110,8 @@ func Invoke(
109110
// Unable to add ActivityTaskCompleted event to history
110111
return nil, err
111112
}
112-
activityStartedTime = ai.StartedTime.AsTime()
113+
attemptStartedTime = ai.StartedTime.AsTime()
114+
firstScheduledTime = ai.FirstScheduledTime.AsTime()
113115
taskQueue = ai.TaskQueue
114116
return &api.UpdateWorkflowAction{
115117
Noop: false,
@@ -121,15 +123,21 @@ func Invoke(
121123
workflowConsistencyChecker,
122124
)
123125

124-
if err == nil && !activityStartedTime.IsZero() && !fabricateStartedEvent {
125-
metrics.ActivityE2ELatency.With(
126-
workflow.GetPerTaskQueueFamilyScope(
127-
shard.GetMetricsHandler(), namespace, taskQueue, shard.GetConfig(),
128-
metrics.OperationTag(metrics.HistoryRespondActivityTaskCompletedScope),
129-
metrics.WorkflowTypeTag(workflowTypeName),
130-
metrics.ActivityTypeTag(token.ActivityType),
131-
),
132-
).Record(time.Since(activityStartedTime))
126+
if err == nil && !fabricateStartedEvent {
127+
workflow.RecordActivityCompletionMetrics(
128+
shard,
129+
namespace,
130+
taskQueue,
131+
workflow.ActivityCompletionMetrics{
132+
AttemptStartedTime: attemptStartedTime,
133+
FirstScheduledTime: firstScheduledTime,
134+
Status: workflow.ActivityStatusSucceeded,
135+
Closed: true,
136+
},
137+
metrics.OperationTag(metrics.HistoryRespondActivityTaskCompletedScope),
138+
metrics.WorkflowTypeTag(workflowTypeName),
139+
metrics.ActivityTypeTag(token.ActivityType),
140+
)
133141
}
134142
return &historyservice.RespondActivityTaskCompletedResponse{}, err
135143
}

service/history/api/respondactivitytaskfailed/api.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@ func Invoke(
4040
return nil, err
4141
}
4242

43-
var activityStartedTime time.Time
43+
var attemptStartedTime time.Time
44+
var firstScheduledTime time.Time
4445
var taskQueue string
4546
var workflowTypeName string
47+
var closed bool
4648
err = api.GetAndUpdateWorkflowWithNew(
4749
ctx,
4850
token.Clock,
@@ -109,25 +111,37 @@ func Invoke(
109111
return nil, err
110112
}
111113
postActions.CreateWorkflowTask = true
114+
closed = true
115+
} else {
116+
closed = false
112117
}
113118

114-
activityStartedTime = ai.StartedTime.AsTime()
119+
attemptStartedTime = ai.StartedTime.AsTime()
120+
firstScheduledTime = ai.FirstScheduledTime.AsTime()
115121
taskQueue = ai.TaskQueue
116122
return postActions, nil
117123
},
118124
nil,
119125
shard,
120126
workflowConsistencyChecker,
121127
)
122-
if err == nil && !activityStartedTime.IsZero() {
123-
metrics.ActivityE2ELatency.With(
124-
workflow.GetPerTaskQueueFamilyScope(
125-
shard.GetMetricsHandler(), namespace, taskQueue, shard.GetConfig(),
126-
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
127-
metrics.WorkflowTypeTag(workflowTypeName),
128-
metrics.ActivityTypeTag(token.ActivityType),
129-
),
130-
).Record(time.Since(activityStartedTime))
128+
if err == nil {
129+
completionMetrics := workflow.ActivityCompletionMetrics{
130+
AttemptStartedTime: attemptStartedTime,
131+
FirstScheduledTime: firstScheduledTime,
132+
Status: workflow.ActivityStatusFailed,
133+
Closed: closed,
134+
}
135+
136+
workflow.RecordActivityCompletionMetrics(
137+
shard,
138+
namespace,
139+
taskQueue,
140+
completionMetrics,
141+
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
142+
metrics.WorkflowTypeTag(workflowTypeName),
143+
metrics.ActivityTypeTag(token.ActivityType),
144+
)
131145
}
132146
return &historyservice.RespondActivityTaskFailedResponse{}, err
133147
}

service/history/api/respondactivitytaskfailed/api_test.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (s *workflowSuite) Test_NormalFlowShouldRescheduleActivity_UpdatesWorkflowE
108108
request := s.newRespondActivityTaskFailedRequest(uc)
109109
s.setupStubs(uc)
110110

111-
s.expectTimerMetricsRecorded(uc, s.shardContext)
111+
s.expectTransientFailureMetricsRecorded(uc, s.shardContext)
112112
s.workflowContext.EXPECT().UpdateWorkflowExecutionAsActive(ctx, s.shardContext).Return(nil)
113113

114114
_, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker)
@@ -246,7 +246,7 @@ func (s *workflowSuite) Test_LastHeartBeatDetailsExist_UpdatesMutableState() {
246246
Namespace: request.FailedRequest.GetNamespace(),
247247
})
248248

249-
s.expectTimerMetricsRecorded(uc, s.shardContext)
249+
s.expectTransientFailureMetricsRecorded(uc, s.shardContext)
250250

251251
_, err := Invoke(
252252
ctx,
@@ -294,7 +294,7 @@ func (s *workflowSuite) Test_NoMoreRetriesAndMutableStateHasNoPendingTasks_WillR
294294
})
295295
s.setupStubs(uc)
296296
request := s.newRespondActivityTaskFailedRequest(uc)
297-
s.expectTimerMetricsRecorded(uc, s.shardContext)
297+
s.expectTerminalFailureMetricsRecorded(uc, s.shardContext)
298298
s.currentMutableState.EXPECT().AddActivityTaskFailedEvent(
299299
uc.scheduledEventId,
300300
uc.startedEventId,
@@ -436,8 +436,9 @@ func (s *workflowSuite) setupShardContext(registry namespace.Registry) *historyi
436436
return shardContext
437437
}
438438

439-
func (s *workflowSuite) expectTimerMetricsRecorded(uc UsecaseConfig, shardContext *historyi.MockShardContext) {
439+
func (s *workflowSuite) expectTransientFailureMetricsRecorded(uc UsecaseConfig, shardContext *historyi.MockShardContext) {
440440
timer := metrics.NewMockTimerIface(s.controller)
441+
counter := metrics.NewMockCounterIface(s.controller)
441442
tags := []metrics.Tag{
442443
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
443444
metrics.WorkflowTypeTag(uc.wfType.Name),
@@ -446,12 +447,40 @@ func (s *workflowSuite) expectTimerMetricsRecorded(uc UsecaseConfig, shardContex
446447
metrics.UnsafeTaskQueueTag(uc.taskQueueId),
447448
}
448449

449-
timer.EXPECT().Record(
450-
gomock.Any(),
451-
)
452450
metricsHandler := metrics.NewMockHandler(s.controller)
453451
metricsHandler.EXPECT().WithTags(tags).Return(metricsHandler)
452+
453+
timer.EXPECT().Record(gomock.Any()).Times(2) // ActivityE2ELatency and ActivityStartToCloseLatency
454+
metricsHandler.EXPECT().Timer(metrics.ActivityE2ELatency.Name()).Return(timer)
455+
metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timer)
456+
// ActivityScheduleToCloseLatency is NOT recorded for retries
457+
counter.EXPECT().Record(int64(1))
458+
metricsHandler.EXPECT().Counter(metrics.ActivityTaskFail.Name()).Return(counter)
459+
460+
shardContext.EXPECT().GetMetricsHandler().Return(metricsHandler).AnyTimes()
461+
}
462+
463+
func (s *workflowSuite) expectTerminalFailureMetricsRecorded(uc UsecaseConfig, shardContext *historyi.MockShardContext) {
464+
timer := metrics.NewMockTimerIface(s.controller)
465+
counter := metrics.NewMockCounterIface(s.controller)
466+
tags := []metrics.Tag{
467+
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
468+
metrics.WorkflowTypeTag(uc.wfType.Name),
469+
metrics.ActivityTypeTag(uc.activityType),
470+
metrics.NamespaceTag(uc.namespaceName.String()),
471+
metrics.UnsafeTaskQueueTag(uc.taskQueueId),
472+
}
473+
474+
metricsHandler := metrics.NewMockHandler(s.controller)
475+
metricsHandler.EXPECT().WithTags(tags).Return(metricsHandler)
476+
477+
timer.EXPECT().Record(gomock.Any()).Times(3) // ActivityE2ELatency, ActivityStartToCloseLatency, and ActivityScheduleToCloseLatency
454478
metricsHandler.EXPECT().Timer(metrics.ActivityE2ELatency.Name()).Return(timer)
479+
metricsHandler.EXPECT().Timer(metrics.ActivityStartToCloseLatency.Name()).Return(timer)
480+
metricsHandler.EXPECT().Timer(metrics.ActivityScheduleToCloseLatency.Name()).Return(timer) // Recorded for terminal failures
481+
counter.EXPECT().Record(int64(1)).Times(2) // ActivityFail and ActivityTaskFail
482+
metricsHandler.EXPECT().Counter(metrics.ActivityFail.Name()).Return(counter)
483+
metricsHandler.EXPECT().Counter(metrics.ActivityTaskFail.Name()).Return(counter)
455484

456485
shardContext.EXPECT().GetMetricsHandler().Return(metricsHandler).AnyTimes()
457486
}

service/history/timer_queue_active_task_executor.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,22 @@ func (t *timerQueueActiveTaskExecutor) processSingleActivityTimeoutTask(
296296
return result, nil
297297
}
298298

299+
workflow.RecordActivityCompletionMetrics(
300+
t.shardContext,
301+
namespace.Name(mutableState.GetNamespaceEntry().Name()),
302+
ai.TaskQueue,
303+
workflow.ActivityCompletionMetrics{
304+
Status: workflow.ActivityStatusTimeout,
305+
AttemptStartedTime: timestamp.TimeValue(ai.StartedTime),
306+
FirstScheduledTime: timestamp.TimeValue(ai.FirstScheduledTime),
307+
Closed: retryState != enumspb.RETRY_STATE_IN_PROGRESS,
308+
TimerType: timerSequenceID.TimerType,
309+
},
310+
metrics.OperationTag(metrics.TimerActiveTaskActivityTimeoutScope),
311+
metrics.WorkflowTypeTag(mutableState.GetWorkflowType().GetName()),
312+
metrics.ActivityTypeTag(ai.ActivityType.GetName()),
313+
)
314+
299315
if retryState == enumspb.RETRY_STATE_IN_PROGRESS {
300316
// TODO uncommment once RETRY_STATE_PAUSED is supported
301317
// || retryState == enumspb.RETRY_STATE_PAUSED {

service/history/timer_queue_active_task_executor_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2309,13 +2309,15 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessSingleActivityTimeoutTask
23092309
s.Run(tc.name, func() {
23102310
if tc.expectRetryActivity {
23112311
ms.EXPECT().RetryActivity(gomock.Any(), gomock.Any()).Return(tc.retryState, tc.retryError)
2312+
ms.EXPECT().GetWorkflowType().Return(&commonpb.WorkflowType{Name: "test-workflow-type"}).AnyTimes()
23122313
}
23132314

23142315
if tc.expectAddTimedTask {
23152316
ms.EXPECT().GetExecutionInfo().Return(info).AnyTimes()
23162317
ms.EXPECT().AddActivityTaskTimedOutEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
23172318
}
23182319
ms.EXPECT().GetEffectiveVersioningBehavior().Return(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED).AnyTimes()
2320+
ms.EXPECT().GetNamespaceEntry().Return(tests.LocalNamespaceEntry).AnyTimes()
23192321

23202322
result, err := s.timerQueueActiveTaskExecutor.processSingleActivityTimeoutTask(
23212323
ms, tc.timerSequenceID, tc.ai)

0 commit comments

Comments
 (0)