Skip to content

Commit cfd3762

Browse files
committed
Record activity duration
1 parent 469526e commit cfd3762

File tree

8 files changed

+183
-46
lines changed

8 files changed

+183
-46
lines changed

common/metrics/metric_defs.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -814,12 +814,30 @@ var (
814814
"pending_tasks",
815815
WithDescription("A histogram across history shards for the number of in-memory pending history tasks."),
816816
)
817-
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
818-
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
819-
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
820-
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
821-
QueueActionCounter = NewCounterDef("queue_actions")
822-
ActivityE2ELatency = NewTimerDef("activity_end_to_end_latency")
817+
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
818+
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
819+
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
820+
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
821+
QueueActionCounter = NewCounterDef("queue_actions")
822+
ActivityE2ELatency = NewTimerDef(
823+
"activity_end_to_end_latency",
824+
WithDescription("DEPRECATED: Will be removed in one of the next releases. Duration of an activity attempt. Use activity_attempt_duration instead."),
825+
)
826+
ActivityAttemptDuration = NewTimerDef(
827+
"activity_attempt_duration",
828+
WithDescription("Duration of a single activity attempt. Doesn't include retries or backoffs."),
829+
)
830+
ActivityE2EDuration = NewTimerDef(
831+
"activity_e2e_duration",
832+
WithDescription("Duration of activity execution from scheduled time to terminal state. Includes retries and backoffs."),
833+
)
834+
ActivitySucceededCount = NewCounterDef("activity_succeeded_count")
835+
ActivityFailedCount = NewCounterDef("activity_failed_count")
836+
ActivityTimeoutCount = NewCounterDef(
837+
"activity_timeout_count",
838+
WithDescription("Number of activity executions that ended in terminal timeout (e.g. ScheduleToClose timeout is expired)."),
839+
)
840+
ActivityRetryCount = NewCounterDef("activity_retry_count")
823841
AckLevelUpdateCounter = NewCounterDef("ack_level_update")
824842
AckLevelUpdateFailedCounter = NewCounterDef("ack_level_update_failed")
825843
CommandCounter = NewCounterDef("command")

service/history/api/activity_util.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,4 @@ func GetActivityScheduledEventID(
5252
}
5353
return activityInfo.ScheduledEventId, nil
5454
}
55+

service/history/api/respondactivitytaskcanceled/api.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func Invoke(
3838
return nil, err
3939
}
4040

41-
var activityStartedTime time.Time
41+
var attemptStartedTime time.Time
42+
var firstScheduledTime time.Time
4243
var taskQueue string
4344
var workflowTypeName string
4445
err = api.GetAndUpdateWorkflowWithNew(
@@ -96,7 +97,8 @@ func Invoke(
9697
return nil, err
9798
}
9899

99-
activityStartedTime = ai.StartedTime.AsTime()
100+
attemptStartedTime = ai.StartedTime.AsTime()
101+
firstScheduledTime = ai.FirstScheduledTime.AsTime()
100102
taskQueue = ai.TaskQueue
101103
return &api.UpdateWorkflowAction{
102104
Noop: false,
@@ -108,15 +110,20 @@ func Invoke(
108110
workflowConsistencyChecker,
109111
)
110112

111-
if err == nil && !activityStartedTime.IsZero() {
112-
metrics.ActivityE2ELatency.With(
113-
workflow.GetPerTaskQueueFamilyScope(
114-
shard.GetMetricsHandler(), namespace, taskQueue, shard.GetConfig(),
115-
metrics.OperationTag(metrics.HistoryRespondActivityTaskCanceledScope),
116-
metrics.WorkflowTypeTag(workflowTypeName),
117-
metrics.ActivityTypeTag(token.ActivityType),
118-
),
119-
).Record(time.Since(activityStartedTime))
113+
if err == nil {
114+
workflow.RecordActivityCompletionMetrics(
115+
shard,
116+
namespace,
117+
taskQueue,
118+
workflow.ActivityCompletionMetrics{
119+
IsTerminalFailure: true, // cancellation counts as failure
120+
AttemptStartedTime: attemptStartedTime,
121+
FirstScheduledTime: firstScheduledTime,
122+
},
123+
metrics.OperationTag(metrics.HistoryRespondActivityTaskCanceledScope),
124+
metrics.WorkflowTypeTag(workflowTypeName),
125+
metrics.ActivityTypeTag(token.ActivityType),
126+
)
120127
}
121128
return &historyservice.RespondActivityTaskCanceledResponse{}, err
122129
}

service/history/api/respondactivitytaskcompleted/api.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func Invoke(
3838
return nil, err
3939
}
4040

41-
var activityStartedTime time.Time
41+
var attemptStartedTime time.Time
42+
var firstScheduledTime time.Time
4243
var taskQueue string
4344
var workflowTypeName string
4445
var fabricateStartedEvent bool
@@ -109,7 +110,8 @@ func Invoke(
109110
// Unable to add ActivityTaskCompleted event to history
110111
return nil, err
111112
}
112-
activityStartedTime = ai.StartedTime.AsTime()
113+
attemptStartedTime = ai.StartedTime.AsTime()
114+
firstScheduledTime = ai.FirstScheduledTime.AsTime()
113115
taskQueue = ai.TaskQueue
114116
return &api.UpdateWorkflowAction{
115117
Noop: false,
@@ -121,15 +123,19 @@ func Invoke(
121123
workflowConsistencyChecker,
122124
)
123125

124-
if err == nil && !activityStartedTime.IsZero() && !fabricateStartedEvent {
125-
metrics.ActivityE2ELatency.With(
126-
workflow.GetPerTaskQueueFamilyScope(
127-
shard.GetMetricsHandler(), namespace, taskQueue, shard.GetConfig(),
128-
metrics.OperationTag(metrics.HistoryRespondActivityTaskCompletedScope),
129-
metrics.WorkflowTypeTag(workflowTypeName),
130-
metrics.ActivityTypeTag(token.ActivityType),
131-
),
132-
).Record(time.Since(activityStartedTime))
126+
if err == nil && !fabricateStartedEvent {
127+
workflow.RecordActivityCompletionMetrics(
128+
shard,
129+
namespace,
130+
taskQueue,
131+
workflow.ActivityCompletionMetrics{
132+
AttemptStartedTime: attemptStartedTime,
133+
FirstScheduledTime: firstScheduledTime,
134+
},
135+
metrics.OperationTag(metrics.HistoryRespondActivityTaskCompletedScope),
136+
metrics.WorkflowTypeTag(workflowTypeName),
137+
metrics.ActivityTypeTag(token.ActivityType),
138+
)
133139
}
134140
return &historyservice.RespondActivityTaskCompletedResponse{}, err
135141
}

service/history/api/respondactivitytaskfailed/api.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@ func Invoke(
4040
return nil, err
4141
}
4242

43-
var activityStartedTime time.Time
43+
var attemptStartedTime time.Time
44+
var firstScheduledTime time.Time
4445
var taskQueue string
4546
var workflowTypeName string
47+
var retryScheduled bool
4648
err = api.GetAndUpdateWorkflowWithNew(
4749
ctx,
4850
token.Clock,
@@ -109,25 +111,36 @@ func Invoke(
109111
return nil, err
110112
}
111113
postActions.CreateWorkflowTask = true
114+
} else {
115+
retryScheduled = true
112116
}
113117

114-
activityStartedTime = ai.StartedTime.AsTime()
118+
attemptStartedTime = ai.StartedTime.AsTime()
119+
firstScheduledTime = ai.FirstScheduledTime.AsTime()
115120
taskQueue = ai.TaskQueue
116121
return postActions, nil
117122
},
118123
nil,
119124
shard,
120125
workflowConsistencyChecker,
121126
)
122-
if err == nil && !activityStartedTime.IsZero() {
123-
metrics.ActivityE2ELatency.With(
124-
workflow.GetPerTaskQueueFamilyScope(
125-
shard.GetMetricsHandler(), namespace, taskQueue, shard.GetConfig(),
126-
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
127-
metrics.WorkflowTypeTag(workflowTypeName),
128-
metrics.ActivityTypeTag(token.ActivityType),
129-
),
130-
).Record(time.Since(activityStartedTime))
127+
if err == nil {
128+
completionMetrics := workflow.ActivityCompletionMetrics{
129+
AttemptStartedTime: attemptStartedTime,
130+
FirstScheduledTime: firstScheduledTime,
131+
RetryScheduled: retryScheduled,
132+
IsTerminalFailure: !retryScheduled,
133+
}
134+
135+
workflow.RecordActivityCompletionMetrics(
136+
shard,
137+
namespace,
138+
taskQueue,
139+
completionMetrics,
140+
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
141+
metrics.WorkflowTypeTag(workflowTypeName),
142+
metrics.ActivityTypeTag(token.ActivityType),
143+
)
131144
}
132145
return &historyservice.RespondActivityTaskFailedResponse{}, err
133146
}

service/history/api/respondactivitytaskfailed/api_test.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (s *workflowSuite) Test_NormalFlowShouldRescheduleActivity_UpdatesWorkflowE
108108
request := s.newRespondActivityTaskFailedRequest(uc)
109109
s.setupStubs(uc)
110110

111-
s.expectTimerMetricsRecorded(uc, s.shardContext)
111+
s.expectRetryMetricsRecorded(uc, s.shardContext)
112112
s.workflowContext.EXPECT().UpdateWorkflowExecutionAsActive(ctx, s.shardContext).Return(nil)
113113

114114
_, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker)
@@ -246,7 +246,7 @@ func (s *workflowSuite) Test_LastHeartBeatDetailsExist_UpdatesMutableState() {
246246
Namespace: request.FailedRequest.GetNamespace(),
247247
})
248248

249-
s.expectTimerMetricsRecorded(uc, s.shardContext)
249+
s.expectRetryMetricsRecorded(uc, s.shardContext)
250250

251251
_, err := Invoke(
252252
ctx,
@@ -294,7 +294,7 @@ func (s *workflowSuite) Test_NoMoreRetriesAndMutableStateHasNoPendingTasks_WillR
294294
})
295295
s.setupStubs(uc)
296296
request := s.newRespondActivityTaskFailedRequest(uc)
297-
s.expectTimerMetricsRecorded(uc, s.shardContext)
297+
s.expectTerminalFailureMetricsRecorded(uc, s.shardContext)
298298
s.currentMutableState.EXPECT().AddActivityTaskFailedEvent(
299299
uc.scheduledEventId,
300300
uc.startedEventId,
@@ -436,8 +436,9 @@ func (s *workflowSuite) setupShardContext(registry namespace.Registry) *historyi
436436
return shardContext
437437
}
438438

439-
func (s *workflowSuite) expectTimerMetricsRecorded(uc UsecaseConfig, shardContext *historyi.MockShardContext) {
439+
func (s *workflowSuite) expectRetryMetricsRecorded(uc UsecaseConfig, shardContext *historyi.MockShardContext) {
440440
timer := metrics.NewMockTimerIface(s.controller)
441+
counter := metrics.NewMockCounterIface(s.controller)
441442
tags := []metrics.Tag{
442443
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
443444
metrics.WorkflowTypeTag(uc.wfType.Name),
@@ -446,12 +447,39 @@ func (s *workflowSuite) expectTimerMetricsRecorded(uc UsecaseConfig, shardContex
446447
metrics.UnsafeTaskQueueTag(uc.taskQueueId),
447448
}
448449

449-
timer.EXPECT().Record(
450-
gomock.Any(),
451-
)
452450
metricsHandler := metrics.NewMockHandler(s.controller)
453451
metricsHandler.EXPECT().WithTags(tags).Return(metricsHandler)
452+
453+
timer.EXPECT().Record(gomock.Any()).Times(2) // ActivityE2ELatency and ActivityAttemptDuration
454+
metricsHandler.EXPECT().Timer(metrics.ActivityE2ELatency.Name()).Return(timer)
455+
metricsHandler.EXPECT().Timer(metrics.ActivityAttemptDuration.Name()).Return(timer)
456+
// ActivityE2EDuration is NOT recorded for retries
457+
counter.EXPECT().Record(int64(1))
458+
metricsHandler.EXPECT().Counter(metrics.ActivityRetryCount.Name()).Return(counter)
459+
460+
shardContext.EXPECT().GetMetricsHandler().Return(metricsHandler).AnyTimes()
461+
}
462+
463+
func (s *workflowSuite) expectTerminalFailureMetricsRecorded(uc UsecaseConfig, shardContext *historyi.MockShardContext) {
464+
timer := metrics.NewMockTimerIface(s.controller)
465+
counter := metrics.NewMockCounterIface(s.controller)
466+
tags := []metrics.Tag{
467+
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
468+
metrics.WorkflowTypeTag(uc.wfType.Name),
469+
metrics.ActivityTypeTag(uc.activityType),
470+
metrics.NamespaceTag(uc.namespaceName.String()),
471+
metrics.UnsafeTaskQueueTag(uc.taskQueueId),
472+
}
473+
474+
metricsHandler := metrics.NewMockHandler(s.controller)
475+
metricsHandler.EXPECT().WithTags(tags).Return(metricsHandler)
476+
477+
timer.EXPECT().Record(gomock.Any()).Times(3) // ActivityE2ELatency, ActivityAttemptDuration, and ActivityE2EDuration
454478
metricsHandler.EXPECT().Timer(metrics.ActivityE2ELatency.Name()).Return(timer)
479+
metricsHandler.EXPECT().Timer(metrics.ActivityAttemptDuration.Name()).Return(timer)
480+
metricsHandler.EXPECT().Timer(metrics.ActivityE2EDuration.Name()).Return(timer) // Recorded for terminal failures
481+
counter.EXPECT().Record(int64(1))
482+
metricsHandler.EXPECT().Counter(metrics.ActivityFailedCount.Name()).Return(counter)
455483

456484
shardContext.EXPECT().GetMetricsHandler().Return(metricsHandler).AnyTimes()
457485
}

service/history/timer_queue_active_task_executor.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,18 @@ func (t *timerQueueActiveTaskExecutor) processSingleActivityTimeoutTask(
296296
return result, nil
297297
}
298298

299+
workflow.RecordActivityCompletionMetrics(
300+
t.shardContext,
301+
namespace.Name(mutableState.GetNamespaceEntry().Name()),
302+
ai.TaskQueue,
303+
workflow.ActivityCompletionMetrics{
304+
IsTerminalFailure: retryState != enumspb.RETRY_STATE_IN_PROGRESS,
305+
RetryScheduled: retryState == enumspb.RETRY_STATE_IN_PROGRESS,
306+
AttemptStartedTime: timestamp.TimeValue(ai.StartedTime),
307+
FirstScheduledTime: timestamp.TimeValue(ai.FirstScheduledTime),
308+
},
309+
)
310+
299311
if retryState == enumspb.RETRY_STATE_IN_PROGRESS {
300312
// TODO uncommment once RETRY_STATE_PAUSED is supported
301313
// || retryState == enumspb.RETRY_STATE_PAUSED {
@@ -319,6 +331,7 @@ func (t *timerQueueActiveTaskExecutor) processSingleActivityTimeoutTask(
319331
mutableState.GetEffectiveVersioningBehavior(),
320332
ai.Attempt,
321333
)
334+
322335
if _, err = mutableState.AddActivityTaskTimedOutEvent(
323336
ai.ScheduledEventId,
324337
ai.StartedEventId,

service/history/workflow/metrics.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package workflow
22

33
import (
4+
"time"
5+
46
enumspb "go.temporal.io/api/enums/v1"
57
enumsspb "go.temporal.io/server/api/enums/v1"
68
"go.temporal.io/server/common/metrics"
79
"go.temporal.io/server/common/namespace"
810
"go.temporal.io/server/common/persistence"
911
"go.temporal.io/server/common/tqid"
1012
"go.temporal.io/server/service/history/configs"
13+
historyi "go.temporal.io/server/service/history/interfaces"
1114
)
1215

1316
func emitWorkflowHistoryStats(
@@ -119,3 +122,51 @@ func GetPerTaskQueueFamilyScope(
119122
tags...,
120123
)
121124
}
125+
126+
type ActivityCompletionMetrics struct {
127+
// IsTerminalFailure is true if the activity failed and won't be retried
128+
IsTerminalFailure bool
129+
// RetryScheduled is true if the activity is scheduled to be retried
130+
RetryScheduled bool
131+
// AttemptStartedTime is the start time of the current attempt
132+
AttemptStartedTime time.Time
133+
// FirstScheduledTime is the scheduled time of the first attempt
134+
FirstScheduledTime time.Time
135+
}
136+
137+
func RecordActivityCompletionMetrics(
138+
shard historyi.ShardContext,
139+
namespace namespace.Name,
140+
taskQueue string,
141+
metricsState ActivityCompletionMetrics,
142+
tags ...metrics.Tag,
143+
) {
144+
metricsHandler := GetPerTaskQueueFamilyScope(
145+
shard.GetMetricsHandler(),
146+
namespace,
147+
taskQueue,
148+
shard.GetConfig(),
149+
tags...,
150+
)
151+
152+
if !metricsState.AttemptStartedTime.IsZero() {
153+
latency := time.Since(metricsState.AttemptStartedTime)
154+
// ActivityE2ELatency is deprecated due to its inaccurate naming. It captures the attempt duration instead of an end-to-end duration as its name suggests. For now record both metrics
155+
metrics.ActivityE2ELatency.With(metricsHandler).Record(latency)
156+
metrics.ActivityAttemptDuration.With(metricsHandler).Record(latency)
157+
}
158+
159+
// Record true end-to-end duration only for terminal states (includes retries and backoffs)
160+
if !metricsState.RetryScheduled && !metricsState.FirstScheduledTime.IsZero() {
161+
e2eDuration := time.Since(metricsState.FirstScheduledTime)
162+
metrics.ActivityE2EDuration.With(metricsHandler).Record(e2eDuration)
163+
}
164+
165+
if metricsState.IsTerminalFailure {
166+
metrics.ActivityFailedCount.With(metricsHandler).Record(1)
167+
} else if metricsState.RetryScheduled {
168+
metrics.ActivityRetryCount.With(metricsHandler).Record(1)
169+
} else {
170+
metrics.ActivitySucceededCount.With(metricsHandler).Record(1)
171+
}
172+
}

0 commit comments

Comments
 (0)