Skip to content

Commit 734a46f

Browse files
committed
Record activity duration
1 parent 9c75cd6 commit 734a46f

File tree

7 files changed

+199
-46
lines changed

7 files changed

+199
-46
lines changed

common/metrics/metric_defs.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -814,12 +814,30 @@ var (
814814
"pending_tasks",
815815
WithDescription("A histogram across history shards for the number of in-memory pending history tasks."),
816816
)
817-
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
818-
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
819-
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
820-
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
821-
QueueActionCounter = NewCounterDef("queue_actions")
822-
ActivityE2ELatency = NewTimerDef("activity_end_to_end_latency")
817+
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
818+
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
819+
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
820+
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
821+
QueueActionCounter = NewCounterDef("queue_actions")
822+
ActivityE2ELatency = NewTimerDef(
823+
"activity_end_to_end_latency",
824+
WithDescription("DEPRECATED: Will be removed in one of the next releases. Duration of an activity attempt. Use activity_attempt_duration instead."),
825+
)
826+
ActivityAttemptDuration = NewTimerDef(
827+
"activity_attempt_duration",
828+
WithDescription("Duration of a single activity attempt. Doesn't include retries or backoffs."),
829+
)
830+
ActivityE2EDuration = NewTimerDef(
831+
"activity_e2e_duration",
832+
WithDescription("Duration of activity execution from scheduled time to terminal state. Includes retries and backoffs."),
833+
)
834+
ActivitySucceededCount = NewCounterDef("activity_succeeded_count")
835+
ActivityFailedCount = NewCounterDef("activity_failed_count")
836+
ActivityTimeoutCount = NewCounterDef(
837+
"activity_timeout_count",
838+
WithDescription("Number of activity executions that ended in terminal timeout (e.g. ScheduleToClose timeout is expired)."),
839+
)
840+
ActivityRetryCount = NewCounterDef("activity_retry_count")
823841
AckLevelUpdateCounter = NewCounterDef("ack_level_update")
824842
AckLevelUpdateFailedCounter = NewCounterDef("ack_level_update_failed")
825843
CommandCounter = NewCounterDef("command")

service/history/api/respondactivitytaskcanceled/api.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func Invoke(
3838
return nil, err
3939
}
4040

41-
var activityStartedTime time.Time
41+
var attemptStartedTime time.Time
42+
var firstScheduledTime time.Time
4243
var taskQueue string
4344
var workflowTypeName string
4445
err = api.GetAndUpdateWorkflowWithNew(
@@ -96,7 +97,8 @@ func Invoke(
9697
return nil, err
9798
}
9899

99-
activityStartedTime = ai.StartedTime.AsTime()
100+
attemptStartedTime = ai.StartedTime.AsTime()
101+
firstScheduledTime = ai.FirstScheduledTime.AsTime()
100102
taskQueue = ai.TaskQueue
101103
return &api.UpdateWorkflowAction{
102104
Noop: false,
@@ -108,15 +110,20 @@ func Invoke(
108110
workflowConsistencyChecker,
109111
)
110112

111-
if err == nil && !activityStartedTime.IsZero() {
112-
metrics.ActivityE2ELatency.With(
113-
workflow.GetPerTaskQueueFamilyScope(
114-
shard.GetMetricsHandler(), namespace, taskQueue, shard.GetConfig(),
115-
metrics.OperationTag(metrics.HistoryRespondActivityTaskCanceledScope),
116-
metrics.WorkflowTypeTag(workflowTypeName),
117-
metrics.ActivityTypeTag(token.ActivityType),
118-
),
119-
).Record(time.Since(activityStartedTime))
113+
if err == nil {
114+
workflow.RecordActivityCompletionMetrics(
115+
shard,
116+
namespace,
117+
taskQueue,
118+
workflow.ActivityCompletionMetrics{
119+
State: workflow.StateTerminalFailure,
120+
AttemptStartedTime: attemptStartedTime,
121+
FirstScheduledTime: firstScheduledTime,
122+
},
123+
metrics.OperationTag(metrics.HistoryRespondActivityTaskCanceledScope),
124+
metrics.WorkflowTypeTag(workflowTypeName),
125+
metrics.ActivityTypeTag(token.ActivityType),
126+
)
120127
}
121128
return &historyservice.RespondActivityTaskCanceledResponse{}, err
122129
}

service/history/api/respondactivitytaskcompleted/api.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func Invoke(
3838
return nil, err
3939
}
4040

41-
var activityStartedTime time.Time
41+
var attemptStartedTime time.Time
42+
var firstScheduledTime time.Time
4243
var taskQueue string
4344
var workflowTypeName string
4445
var fabricateStartedEvent bool
@@ -109,7 +110,8 @@ func Invoke(
109110
// Unable to add ActivityTaskCompleted event to history
110111
return nil, err
111112
}
112-
activityStartedTime = ai.StartedTime.AsTime()
113+
attemptStartedTime = ai.StartedTime.AsTime()
114+
firstScheduledTime = ai.FirstScheduledTime.AsTime()
113115
taskQueue = ai.TaskQueue
114116
return &api.UpdateWorkflowAction{
115117
Noop: false,
@@ -121,15 +123,20 @@ func Invoke(
121123
workflowConsistencyChecker,
122124
)
123125

124-
if err == nil && !activityStartedTime.IsZero() && !fabricateStartedEvent {
125-
metrics.ActivityE2ELatency.With(
126-
workflow.GetPerTaskQueueFamilyScope(
127-
shard.GetMetricsHandler(), namespace, taskQueue, shard.GetConfig(),
128-
metrics.OperationTag(metrics.HistoryRespondActivityTaskCompletedScope),
129-
metrics.WorkflowTypeTag(workflowTypeName),
130-
metrics.ActivityTypeTag(token.ActivityType),
131-
),
132-
).Record(time.Since(activityStartedTime))
126+
if err == nil && !fabricateStartedEvent {
127+
workflow.RecordActivityCompletionMetrics(
128+
shard,
129+
namespace,
130+
taskQueue,
131+
workflow.ActivityCompletionMetrics{
132+
AttemptStartedTime: attemptStartedTime,
133+
FirstScheduledTime: firstScheduledTime,
134+
State: workflow.StateSucceeded,
135+
},
136+
metrics.OperationTag(metrics.HistoryRespondActivityTaskCompletedScope),
137+
metrics.WorkflowTypeTag(workflowTypeName),
138+
metrics.ActivityTypeTag(token.ActivityType),
139+
)
133140
}
134141
return &historyservice.RespondActivityTaskCompletedResponse{}, err
135142
}

service/history/api/respondactivitytaskfailed/api.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@ func Invoke(
4040
return nil, err
4141
}
4242

43-
var activityStartedTime time.Time
43+
var attemptStartedTime time.Time
44+
var firstScheduledTime time.Time
4445
var taskQueue string
4546
var workflowTypeName string
47+
var activityState workflow.ActivityState
4648
err = api.GetAndUpdateWorkflowWithNew(
4749
ctx,
4850
token.Clock,
@@ -109,25 +111,36 @@ func Invoke(
109111
return nil, err
110112
}
111113
postActions.CreateWorkflowTask = true
114+
activityState = workflow.StateTerminalFailure
115+
} else {
116+
activityState = workflow.StateRetryScheduled
112117
}
113118

114-
activityStartedTime = ai.StartedTime.AsTime()
119+
attemptStartedTime = ai.StartedTime.AsTime()
120+
firstScheduledTime = ai.FirstScheduledTime.AsTime()
115121
taskQueue = ai.TaskQueue
116122
return postActions, nil
117123
},
118124
nil,
119125
shard,
120126
workflowConsistencyChecker,
121127
)
122-
if err == nil && !activityStartedTime.IsZero() {
123-
metrics.ActivityE2ELatency.With(
124-
workflow.GetPerTaskQueueFamilyScope(
125-
shard.GetMetricsHandler(), namespace, taskQueue, shard.GetConfig(),
126-
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
127-
metrics.WorkflowTypeTag(workflowTypeName),
128-
metrics.ActivityTypeTag(token.ActivityType),
129-
),
130-
).Record(time.Since(activityStartedTime))
128+
if err == nil {
129+
completionMetrics := workflow.ActivityCompletionMetrics{
130+
AttemptStartedTime: attemptStartedTime,
131+
FirstScheduledTime: firstScheduledTime,
132+
State: activityState,
133+
}
134+
135+
workflow.RecordActivityCompletionMetrics(
136+
shard,
137+
namespace,
138+
taskQueue,
139+
completionMetrics,
140+
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
141+
metrics.WorkflowTypeTag(workflowTypeName),
142+
metrics.ActivityTypeTag(token.ActivityType),
143+
)
131144
}
132145
return &historyservice.RespondActivityTaskFailedResponse{}, err
133146
}

service/history/api/respondactivitytaskfailed/api_test.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (s *workflowSuite) Test_NormalFlowShouldRescheduleActivity_UpdatesWorkflowE
108108
request := s.newRespondActivityTaskFailedRequest(uc)
109109
s.setupStubs(uc)
110110

111-
s.expectTimerMetricsRecorded(uc, s.shardContext)
111+
s.expectRetryMetricsRecorded(uc, s.shardContext)
112112
s.workflowContext.EXPECT().UpdateWorkflowExecutionAsActive(ctx, s.shardContext).Return(nil)
113113

114114
_, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker)
@@ -246,7 +246,7 @@ func (s *workflowSuite) Test_LastHeartBeatDetailsExist_UpdatesMutableState() {
246246
Namespace: request.FailedRequest.GetNamespace(),
247247
})
248248

249-
s.expectTimerMetricsRecorded(uc, s.shardContext)
249+
s.expectRetryMetricsRecorded(uc, s.shardContext)
250250

251251
_, err := Invoke(
252252
ctx,
@@ -294,7 +294,7 @@ func (s *workflowSuite) Test_NoMoreRetriesAndMutableStateHasNoPendingTasks_WillR
294294
})
295295
s.setupStubs(uc)
296296
request := s.newRespondActivityTaskFailedRequest(uc)
297-
s.expectTimerMetricsRecorded(uc, s.shardContext)
297+
s.expectTerminalFailureMetricsRecorded(uc, s.shardContext)
298298
s.currentMutableState.EXPECT().AddActivityTaskFailedEvent(
299299
uc.scheduledEventId,
300300
uc.startedEventId,
@@ -436,8 +436,9 @@ func (s *workflowSuite) setupShardContext(registry namespace.Registry) *historyi
436436
return shardContext
437437
}
438438

439-
func (s *workflowSuite) expectTimerMetricsRecorded(uc UsecaseConfig, shardContext *historyi.MockShardContext) {
439+
func (s *workflowSuite) expectRetryMetricsRecorded(uc UsecaseConfig, shardContext *historyi.MockShardContext) {
440440
timer := metrics.NewMockTimerIface(s.controller)
441+
counter := metrics.NewMockCounterIface(s.controller)
441442
tags := []metrics.Tag{
442443
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
443444
metrics.WorkflowTypeTag(uc.wfType.Name),
@@ -446,12 +447,39 @@ func (s *workflowSuite) expectTimerMetricsRecorded(uc UsecaseConfig, shardContex
446447
metrics.UnsafeTaskQueueTag(uc.taskQueueId),
447448
}
448449

449-
timer.EXPECT().Record(
450-
gomock.Any(),
451-
)
452450
metricsHandler := metrics.NewMockHandler(s.controller)
453451
metricsHandler.EXPECT().WithTags(tags).Return(metricsHandler)
452+
453+
timer.EXPECT().Record(gomock.Any()).Times(2) // ActivityE2ELatency and ActivityAttemptDuration
454+
metricsHandler.EXPECT().Timer(metrics.ActivityE2ELatency.Name()).Return(timer)
455+
metricsHandler.EXPECT().Timer(metrics.ActivityAttemptDuration.Name()).Return(timer)
456+
// ActivityE2EDuration is NOT recorded for retries
457+
counter.EXPECT().Record(int64(1))
458+
metricsHandler.EXPECT().Counter(metrics.ActivityRetryCount.Name()).Return(counter)
459+
460+
shardContext.EXPECT().GetMetricsHandler().Return(metricsHandler).AnyTimes()
461+
}
462+
463+
func (s *workflowSuite) expectTerminalFailureMetricsRecorded(uc UsecaseConfig, shardContext *historyi.MockShardContext) {
464+
timer := metrics.NewMockTimerIface(s.controller)
465+
counter := metrics.NewMockCounterIface(s.controller)
466+
tags := []metrics.Tag{
467+
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
468+
metrics.WorkflowTypeTag(uc.wfType.Name),
469+
metrics.ActivityTypeTag(uc.activityType),
470+
metrics.NamespaceTag(uc.namespaceName.String()),
471+
metrics.UnsafeTaskQueueTag(uc.taskQueueId),
472+
}
473+
474+
metricsHandler := metrics.NewMockHandler(s.controller)
475+
metricsHandler.EXPECT().WithTags(tags).Return(metricsHandler)
476+
477+
timer.EXPECT().Record(gomock.Any()).Times(3) // ActivityE2ELatency, ActivityAttemptDuration, and ActivityE2EDuration
454478
metricsHandler.EXPECT().Timer(metrics.ActivityE2ELatency.Name()).Return(timer)
479+
metricsHandler.EXPECT().Timer(metrics.ActivityAttemptDuration.Name()).Return(timer)
480+
metricsHandler.EXPECT().Timer(metrics.ActivityE2EDuration.Name()).Return(timer) // Recorded for terminal failures
481+
counter.EXPECT().Record(int64(1))
482+
metricsHandler.EXPECT().Counter(metrics.ActivityFailedCount.Name()).Return(counter)
455483

456484
shardContext.EXPECT().GetMetricsHandler().Return(metricsHandler).AnyTimes()
457485
}

service/history/timer_queue_active_task_executor.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,24 @@ func (t *timerQueueActiveTaskExecutor) processSingleActivityTimeoutTask(
296296
return result, nil
297297
}
298298

299+
var state workflow.ActivityState
300+
if retryState == enumspb.RETRY_STATE_IN_PROGRESS {
301+
state = workflow.StateRetryScheduled
302+
} else {
303+
state = workflow.StateTerminalFailure
304+
}
305+
306+
workflow.RecordActivityCompletionMetrics(
307+
t.shardContext,
308+
namespace.Name(mutableState.GetNamespaceEntry().Name()),
309+
ai.TaskQueue,
310+
workflow.ActivityCompletionMetrics{
311+
State: state,
312+
AttemptStartedTime: timestamp.TimeValue(ai.StartedTime),
313+
FirstScheduledTime: timestamp.TimeValue(ai.FirstScheduledTime),
314+
},
315+
)
316+
299317
if retryState == enumspb.RETRY_STATE_IN_PROGRESS {
300318
// TODO uncommment once RETRY_STATE_PAUSED is supported
301319
// || retryState == enumspb.RETRY_STATE_PAUSED {
@@ -319,6 +337,7 @@ func (t *timerQueueActiveTaskExecutor) processSingleActivityTimeoutTask(
319337
mutableState.GetEffectiveVersioningBehavior(),
320338
ai.Attempt,
321339
)
340+
322341
if _, err = mutableState.AddActivityTaskTimedOutEvent(
323342
ai.ScheduledEventId,
324343
ai.StartedEventId,

service/history/workflow/metrics.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package workflow
22

33
import (
4+
"time"
5+
46
enumspb "go.temporal.io/api/enums/v1"
57
enumsspb "go.temporal.io/server/api/enums/v1"
68
"go.temporal.io/server/common/metrics"
79
"go.temporal.io/server/common/namespace"
810
"go.temporal.io/server/common/persistence"
911
"go.temporal.io/server/common/tqid"
1012
"go.temporal.io/server/service/history/configs"
13+
historyi "go.temporal.io/server/service/history/interfaces"
1114
)
1215

1316
func emitWorkflowHistoryStats(
@@ -131,3 +134,61 @@ func GetPerTaskQueueFamilyScope(
131134
tags...,
132135
)
133136
}
137+
138+
type ActivityState int
139+
140+
const (
141+
StateUnknown ActivityState = iota
142+
StateSucceeded
143+
StateTerminalFailure // failed and will NOT be retried
144+
StateRetryScheduled // failed and WILL be retried (has a next scheduled time)
145+
)
146+
147+
type ActivityCompletionMetrics struct {
148+
// State determines whether the activity succeeded, and whether it is/will be retried
149+
State ActivityState
150+
// AttemptStartedTime is the start time of the current attempt
151+
AttemptStartedTime time.Time
152+
// FirstScheduledTime is the scheduled time of the first attempt
153+
FirstScheduledTime time.Time
154+
}
155+
156+
func RecordActivityCompletionMetrics(
157+
shard historyi.ShardContext,
158+
namespace namespace.Name,
159+
taskQueue string,
160+
metricsState ActivityCompletionMetrics,
161+
tags ...metrics.Tag,
162+
) {
163+
metricsHandler := GetPerTaskQueueFamilyScope(
164+
shard.GetMetricsHandler(),
165+
namespace,
166+
taskQueue,
167+
shard.GetConfig(),
168+
tags...,
169+
)
170+
171+
if !metricsState.AttemptStartedTime.IsZero() {
172+
latency := time.Since(metricsState.AttemptStartedTime)
173+
// ActivityE2ELatency is deprecated due to its inaccurate naming. It captures the attempt duration instead of an end-to-end duration as its name suggests. For now record both metrics
174+
metrics.ActivityE2ELatency.With(metricsHandler).Record(latency)
175+
metrics.ActivityAttemptDuration.With(metricsHandler).Record(latency)
176+
}
177+
178+
// Record true end-to-end duration only for terminal states (includes retries and backoffs)
179+
if metricsState.State != StateRetryScheduled && !metricsState.FirstScheduledTime.IsZero() {
180+
e2eDuration := time.Since(metricsState.FirstScheduledTime)
181+
metrics.ActivityE2EDuration.With(metricsHandler).Record(e2eDuration)
182+
}
183+
184+
switch metricsState.State {
185+
case StateTerminalFailure:
186+
metrics.ActivityFailedCount.With(metricsHandler).Record(1)
187+
case StateRetryScheduled:
188+
metrics.ActivityRetryCount.With(metricsHandler).Record(1)
189+
case StateSucceeded:
190+
metrics.ActivitySucceededCount.With(metricsHandler).Record(1)
191+
case StateUnknown:
192+
// Do nothing
193+
}
194+
}

0 commit comments

Comments
 (0)