Skip to content

Commit 298d5b6

Browse files
simvladlina-temporal
authored andcommitted
Produce workflow_duration metric on completion (#8185)
## What changed? This PR produces the metric workflow_duration, when the workflow execution completes. ## Why? Currently there is no metric that captures the duration of the workflow execution. It's also valuable to have the duration broken down by task queue, namespace, workflow type, which this PR enables ## How did you test it? - [X] built - [X] run locally and tested manually - [X] covered by existing tests - [X] added new unit test(s) - [ ] added new functional test(s)
1 parent 8adb850 commit 298d5b6

File tree

10 files changed

+144
-24
lines changed

10 files changed

+144
-24
lines changed

common/metrics/metric_defs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,7 @@ var (
926926
WorkflowTimeoutCount = NewCounterDef("workflow_timeout")
927927
WorkflowTerminateCount = NewCounterDef("workflow_terminate")
928928
WorkflowContinuedAsNewCount = NewCounterDef("workflow_continued_as_new")
929+
WorkflowDuration = NewTimerDef("workflow_duration")
929930
ReplicationStreamPanic = NewCounterDef("replication_stream_panic")
930931
ReplicationStreamError = NewCounterDef("replication_stream_error")
931932
ReplicationServiceError = NewCounterDef("replication_service_error")

service/history/ndc/workflow_resetter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ func (r *workflowResetterImpl) persistToDB(
369369
workflow.MutableStateFailoverVersion(resetWorkflow.GetMutableState()),
370370
resetWorkflowSnapshot,
371371
resetWorkflowEventsSeq,
372+
currentWorkflow.GetMutableState().IsWorkflow(),
372373
); err != nil {
373374
return err
374375
}
@@ -409,6 +410,7 @@ func (r *workflowResetterImpl) persistToDB(
409410
&currentRunVerson,
410411
currentWorkflowMutation,
411412
currentWorkflowEventsSeq,
413+
currentWorkflow.GetMutableState().IsWorkflow(),
412414
); err != nil {
413415
return err
414416
}

service/history/ndc/workflow_resetter_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() {
137137
}).AnyTimes()
138138

139139
currentMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes()
140+
currentMutableState.EXPECT().IsWorkflow().Return(true).AnyTimes()
140141
currentNewEventsSize := int64(3444)
141142
currentMutation := &persistence.WorkflowMutation{
142143
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
@@ -201,6 +202,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() {
201202
util.Ptr(int64(0)),
202203
resetSnapshot,
203204
resetEventsSeq,
205+
true, // isWorkflow
204206
).Return(currentNewEventsSize, resetNewEventsSize, nil)
205207

206208
err := s.workflowResetter.persistToDB(context.Background(), currentWorkflow, currentWorkflow, currentMutation, currentEventsSeq, resetWorkflow)
@@ -226,6 +228,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() {
226228
currentMutation := &persistence.WorkflowMutation{}
227229
currentEventsSeq := []*persistence.WorkflowEvents{{}}
228230
currentMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes()
231+
currentMutableState.EXPECT().IsWorkflow().Return(true).AnyTimes()
229232
currentMutableState.EXPECT().CloseTransactionAsMutation(historyi.TransactionPolicyActive).Return(currentMutation, currentEventsSeq, nil)
230233

231234
resetWorkflow := NewMockWorkflow(s.controller)
@@ -263,6 +266,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() {
263266
util.Ptr(int64(0)),
264267
resetSnapshot,
265268
resetEventsSeq,
269+
true, // isWorkflow
266270
).Return(int64(0), int64(0), nil)
267271

268272
err := s.workflowResetter.persistToDB(context.Background(), currentWorkflow, currentWorkflow, nil, nil, resetWorkflow)

service/history/workflow/context.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ func (c *ContextImpl) CreateWorkflowExecution(
246246
shardContext,
247247
newMutableState.GetCurrentVersion(),
248248
createRequest,
249+
newMutableState.IsWorkflow(),
249250
)
250251
if err != nil {
251252
return err
@@ -363,6 +364,7 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
363364
MutableStateFailoverVersion(currentMutableState),
364365
currentWorkflow,
365366
currentWorkflowEventsSeq,
367+
resetMutableState.IsWorkflow(),
366368
); err != nil {
367369
return err
368370
}
@@ -576,6 +578,7 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
576578
MutableStateFailoverVersion(newMutableState),
577579
newWorkflow,
578580
newWorkflowEventsSeq,
581+
c.MutableState.IsWorkflow(),
579582
); err != nil {
580583
return err
581584
}

service/history/workflow/metrics.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,22 @@ func emitMutableStateStatus(
7777
func emitWorkflowCompletionStats(
7878
metricsHandler metrics.Handler,
7979
namespace namespace.Name,
80-
namespaceState string,
81-
taskQueue string,
82-
workflowTypeName string,
83-
status enumspb.WorkflowExecutionStatus,
80+
completion completionMetric,
8481
config *configs.Config,
8582
) {
86-
handler := GetPerTaskQueueFamilyScope(metricsHandler, namespace, taskQueue, config,
83+
// Only emit metrics for Workflows, not other Chasm archetypes
84+
if !completion.isWorkflow {
85+
return
86+
}
87+
88+
handler := GetPerTaskQueueFamilyScope(metricsHandler, namespace, completion.taskQueue, config,
8789
metrics.OperationTag(metrics.WorkflowCompletionStatsScope),
88-
metrics.NamespaceStateTag(namespaceState),
89-
metrics.WorkflowTypeTag(workflowTypeName),
90+
metrics.NamespaceStateTag(completion.namespaceState),
91+
metrics.WorkflowTypeTag(completion.workflowTypeName),
9092
)
9193

92-
switch status {
94+
closed := true
95+
switch completion.status {
9396
case enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED:
9497
metrics.WorkflowSuccessCount.With(handler).Record(1)
9598
case enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED:
@@ -102,6 +105,15 @@ func emitWorkflowCompletionStats(
102105
metrics.WorkflowTerminateCount.With(handler).Record(1)
103106
case enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
104107
metrics.WorkflowContinuedAsNewCount.With(handler).Record(1)
108+
case enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING:
109+
closed = false
110+
}
111+
if closed && completion.startTime != nil && completion.closeTime != nil {
112+
startTime := completion.startTime.AsTime()
113+
closeTime := completion.closeTime.AsTime()
114+
if closeTime.After(startTime) {
115+
metrics.WorkflowDuration.With(handler).Record(closeTime.Sub(startTime))
116+
}
105117
}
106118
}
107119

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package workflow
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
enumspb "go.temporal.io/api/enums/v1"
9+
"go.temporal.io/server/common/dynamicconfig"
10+
"go.temporal.io/server/common/log"
11+
"go.temporal.io/server/common/metrics"
12+
"go.temporal.io/server/common/metrics/metricstest"
13+
"go.temporal.io/server/common/namespace"
14+
"go.temporal.io/server/service/history/configs"
15+
"google.golang.org/protobuf/types/known/timestamppb"
16+
)
17+
18+
func TestEmitWorkflowCompletionStats_WorkflowDuration(t *testing.T) {
19+
logger := log.NewTestLogger()
20+
testHandler, _ := metricstest.NewHandler(logger, metrics.ClientConfig{})
21+
testNamespace := namespace.Name("test-namespace")
22+
config := &configs.Config{
23+
BreakdownMetricsByTaskQueue: dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(true),
24+
}
25+
26+
completionMetric := completionMetric{
27+
initialized: true,
28+
isWorkflow: true,
29+
taskQueue: "test-task-queue",
30+
namespaceState: "active",
31+
workflowTypeName: "test-workflow",
32+
status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
33+
startTime: timestamppb.New(time.Unix(100, 0)),
34+
closeTime: timestamppb.New(time.Unix(130, 0)),
35+
}
36+
37+
emitWorkflowCompletionStats(testHandler, testNamespace, completionMetric, config)
38+
39+
snapshot, err := testHandler.Snapshot()
40+
require.NoError(t, err)
41+
buckets, err := snapshot.Histogram("workflow_duration_milliseconds",
42+
43+
metrics.StringTag("namespace", "test-namespace"),
44+
metrics.StringTag("namespace_state", "active"),
45+
metrics.StringTag("workflowType", "test-workflow"),
46+
metrics.StringTag("operation", "CompletionStats"),
47+
metrics.StringTag("taskqueue", "test-task-queue"),
48+
metrics.StringTag("otel_scope_name", "temporal"),
49+
metrics.StringTag("otel_scope_version", ""),
50+
)
51+
require.NoError(t, err)
52+
require.NotEmpty(t, buckets)
53+
}
54+
55+
func TestEmitWorkflowCompletionStats_SkipNonWorkflow(t *testing.T) {
56+
logger := log.NewTestLogger()
57+
testHandler, _ := metricstest.NewHandler(logger, metrics.ClientConfig{})
58+
testNamespace := namespace.Name("test-namespace")
59+
completionMetric := completionMetric{isWorkflow: false}
60+
emitWorkflowCompletionStats(testHandler, testNamespace, completionMetric, nil)
61+
snapshot, err := testHandler.Snapshot()
62+
require.NoError(t, err)
63+
_, err = snapshot.Histogram("workflow_duration_milliseconds")
64+
require.Error(t, err)
65+
}

service/history/workflow/transaction.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type (
1515
newWorkflowFailoverVersion int64,
1616
newWorkflowSnapshot *persistence.WorkflowSnapshot,
1717
newWorkflowEventsSeq []*persistence.WorkflowEvents,
18+
isWorkflow bool,
1819
) (int64, error)
1920

2021
ConflictResolveWorkflowExecution(
@@ -29,6 +30,7 @@ type (
2930
currentWorkflowFailoverVersion *int64,
3031
currentWorkflowMutation *persistence.WorkflowMutation,
3132
currentWorkflowEventsSeq []*persistence.WorkflowEvents,
33+
isWorkflow bool,
3234
) (int64, int64, int64, error)
3335

3436
UpdateWorkflowExecution(
@@ -40,6 +42,7 @@ type (
4042
newWorkflowFailoverVersion *int64,
4143
newWorkflowSnapshot *persistence.WorkflowSnapshot,
4244
newWorkflowEventsSeq []*persistence.WorkflowEvents,
45+
isWorkflow bool,
4346
) (int64, int64, error)
4447

4548
SetWorkflowExecution(

0 commit comments

Comments
 (0)