Skip to content

Commit 5301f33

Browse files
committed
Add Workflow Duration metrics
1 parent 469526e commit 5301f33

File tree

10 files changed

+174
-24
lines changed

10 files changed

+174
-24
lines changed

common/metrics/metric_defs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,7 @@ var (
926926
WorkflowTimeoutCount = NewCounterDef("workflow_timeout")
927927
WorkflowTerminateCount = NewCounterDef("workflow_terminate")
928928
WorkflowContinuedAsNewCount = NewCounterDef("workflow_continued_as_new")
929+
WorkflowDuration = NewTimerDef("workflow_duration")
929930
ReplicationStreamPanic = NewCounterDef("replication_stream_panic")
930931
ReplicationStreamError = NewCounterDef("replication_stream_error")
931932
ReplicationServiceError = NewCounterDef("replication_service_error")

service/history/ndc/workflow_resetter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ func (r *workflowResetterImpl) persistToDB(
369369
workflow.MutableStateFailoverVersion(resetWorkflow.GetMutableState()),
370370
resetWorkflowSnapshot,
371371
resetWorkflowEventsSeq,
372+
currentWorkflow.GetMutableState().IsWorkflow(),
372373
); err != nil {
373374
return err
374375
}
@@ -409,6 +410,7 @@ func (r *workflowResetterImpl) persistToDB(
409410
&currentRunVerson,
410411
currentWorkflowMutation,
411412
currentWorkflowEventsSeq,
413+
currentWorkflow.GetMutableState().IsWorkflow(),
412414
); err != nil {
413415
return err
414416
}

service/history/ndc/workflow_resetter_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() {
137137
}).AnyTimes()
138138

139139
currentMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes()
140+
currentMutableState.EXPECT().IsWorkflow().Return(true).AnyTimes()
140141
currentNewEventsSize := int64(3444)
141142
currentMutation := &persistence.WorkflowMutation{
142143
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
@@ -201,6 +202,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() {
201202
util.Ptr(int64(0)),
202203
resetSnapshot,
203204
resetEventsSeq,
205+
true, // isWorkflow
204206
).Return(currentNewEventsSize, resetNewEventsSize, nil)
205207

206208
err := s.workflowResetter.persistToDB(context.Background(), currentWorkflow, currentWorkflow, currentMutation, currentEventsSeq, resetWorkflow)
@@ -226,6 +228,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() {
226228
currentMutation := &persistence.WorkflowMutation{}
227229
currentEventsSeq := []*persistence.WorkflowEvents{{}}
228230
currentMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes()
231+
currentMutableState.EXPECT().IsWorkflow().Return(true).AnyTimes()
229232
currentMutableState.EXPECT().CloseTransactionAsMutation(historyi.TransactionPolicyActive).Return(currentMutation, currentEventsSeq, nil)
230233

231234
resetWorkflow := NewMockWorkflow(s.controller)
@@ -263,6 +266,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() {
263266
util.Ptr(int64(0)),
264267
resetSnapshot,
265268
resetEventsSeq,
269+
true, // isWorkflow
266270
).Return(int64(0), int64(0), nil)
267271

268272
err := s.workflowResetter.persistToDB(context.Background(), currentWorkflow, currentWorkflow, nil, nil, resetWorkflow)

service/history/workflow/context.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ func (c *ContextImpl) CreateWorkflowExecution(
246246
shardContext,
247247
newMutableState.GetCurrentVersion(),
248248
createRequest,
249+
newMutableState.IsWorkflow(),
249250
)
250251
if err != nil {
251252
return err
@@ -363,6 +364,7 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
363364
MutableStateFailoverVersion(currentMutableState),
364365
currentWorkflow,
365366
currentWorkflowEventsSeq,
367+
resetMutableState.IsWorkflow(),
366368
); err != nil {
367369
return err
368370
}
@@ -576,6 +578,7 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
576578
MutableStateFailoverVersion(newMutableState),
577579
newWorkflow,
578580
newWorkflowEventsSeq,
581+
c.MutableState.IsWorkflow(),
579582
); err != nil {
580583
return err
581584
}

service/history/workflow/metrics.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,22 @@ func emitMutableStateStatus(
7777
func emitWorkflowCompletionStats(
7878
metricsHandler metrics.Handler,
7979
namespace namespace.Name,
80-
namespaceState string,
81-
taskQueue string,
82-
workflowTypeName string,
83-
status enumspb.WorkflowExecutionStatus,
80+
completion completionMetric,
8481
config *configs.Config,
8582
) {
86-
handler := GetPerTaskQueueFamilyScope(metricsHandler, namespace, taskQueue, config,
83+
// Only emit metrics for Workflows, not other Chasm archetypes
84+
if !completion.isWorkflow {
85+
return
86+
}
87+
88+
handler := GetPerTaskQueueFamilyScope(metricsHandler, namespace, completion.taskQueue, config,
8789
metrics.OperationTag(metrics.WorkflowCompletionStatsScope),
88-
metrics.NamespaceStateTag(namespaceState),
89-
metrics.WorkflowTypeTag(workflowTypeName),
90+
metrics.NamespaceStateTag(completion.namespaceState),
91+
metrics.WorkflowTypeTag(completion.workflowTypeName),
9092
)
9193

92-
switch status {
94+
closed := true
95+
switch completion.status {
9396
case enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED:
9497
metrics.WorkflowSuccessCount.With(handler).Record(1)
9598
case enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED:
@@ -102,6 +105,15 @@ func emitWorkflowCompletionStats(
102105
metrics.WorkflowTerminateCount.With(handler).Record(1)
103106
case enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
104107
metrics.WorkflowContinuedAsNewCount.With(handler).Record(1)
108+
case enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING:
109+
closed = false
110+
}
111+
if closed && completion.startTime != nil && completion.closeTime != nil {
112+
startTime := completion.startTime.AsTime()
113+
closeTime := completion.closeTime.AsTime()
114+
if closeTime.After(startTime) {
115+
metrics.WorkflowDuration.With(handler).Record(closeTime.Sub(startTime))
116+
}
105117
}
106118
}
107119

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package workflow
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
enumspb "go.temporal.io/api/enums/v1"
9+
"go.temporal.io/server/common/dynamicconfig"
10+
"go.temporal.io/server/common/log"
11+
"go.temporal.io/server/common/metrics"
12+
"go.temporal.io/server/common/metrics/metricstest"
13+
"go.temporal.io/server/common/namespace"
14+
"go.temporal.io/server/service/history/configs"
15+
"google.golang.org/protobuf/types/known/timestamppb"
16+
)
17+
18+
func TestEmitWorkflowCompletionStats_WorkflowDuration(t *testing.T) {
19+
logger := log.NewTestLogger()
20+
testHandler, err := metricstest.NewHandler(logger, metrics.ClientConfig{})
21+
require.NoError(t, err)
22+
testNamespace := namespace.Name("test-namespace")
23+
config := &configs.Config{
24+
BreakdownMetricsByTaskQueue: dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(true),
25+
}
26+
27+
completionMetric := completionMetric{
28+
initialized: true,
29+
isWorkflow: true,
30+
taskQueue: "test-task-queue",
31+
namespaceState: "active",
32+
workflowTypeName: "test-workflow",
33+
status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
34+
startTime: timestamppb.New(time.Unix(100, 0)),
35+
closeTime: timestamppb.New(time.Unix(130, 0)),
36+
}
37+
38+
emitWorkflowCompletionStats(testHandler, testNamespace, completionMetric, config)
39+
40+
snapshot, err := testHandler.Snapshot()
41+
require.NoError(t, err)
42+
buckets, err := snapshot.Histogram("workflow_duration_milliseconds",
43+
44+
metrics.StringTag("namespace", "test-namespace"),
45+
metrics.StringTag("namespace_state", "active"),
46+
metrics.StringTag("workflowType", "test-workflow"),
47+
metrics.StringTag("operation", "CompletionStats"),
48+
metrics.StringTag("taskqueue", "test-task-queue"),
49+
metrics.StringTag("otel_scope_name", "temporal"),
50+
metrics.StringTag("otel_scope_version", ""),
51+
)
52+
require.NoError(t, err)
53+
require.NotEmpty(t, buckets)
54+
}
55+
56+
func TestEmitWorkflowCompletionStats_SkipNonWorkflow(t *testing.T) {
57+
logger := log.NewTestLogger()
58+
testHandler, err := metricstest.NewHandler(logger, metrics.ClientConfig{})
59+
require.NoError(t, err)
60+
testNamespace := namespace.Name("test-namespace")
61+
config := &configs.Config{
62+
BreakdownMetricsByTaskQueue: dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(true),
63+
}
64+
65+
// Test with isWorkflow: false (should not emit metrics)
66+
completionMetric := completionMetric{
67+
initialized: true,
68+
isWorkflow: false,
69+
taskQueue: "test-task-queue",
70+
namespaceState: "active",
71+
workflowTypeName: "test-workflow",
72+
status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
73+
startTime: timestamppb.New(time.Unix(100, 0)),
74+
closeTime: timestamppb.New(time.Unix(130, 0)),
75+
}
76+
77+
emitWorkflowCompletionStats(testHandler, testNamespace, completionMetric, config)
78+
79+
snapshot, err := testHandler.Snapshot()
80+
require.NoError(t, err)
81+
82+
// When isWorkflow is false, no metrics should be emitted at all
83+
// So the metric shouldn't even exist in the snapshot
84+
buckets, err := snapshot.Histogram("workflow_duration_milliseconds",
85+
metrics.StringTag("namespace", "test-namespace"),
86+
metrics.StringTag("namespace_state", "active"),
87+
metrics.StringTag("workflowType", "test-workflow"),
88+
metrics.StringTag("operation", "CompletionStats"),
89+
metrics.StringTag("taskqueue", "test-task-queue"),
90+
metrics.StringTag("otel_scope_name", "temporal"),
91+
metrics.StringTag("otel_scope_version", ""),
92+
)
93+
require.Error(t, err) // Should error because metric doesn't exist
94+
require.Nil(t, buckets)
95+
}

service/history/workflow/transaction.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type (
1515
newWorkflowFailoverVersion int64,
1616
newWorkflowSnapshot *persistence.WorkflowSnapshot,
1717
newWorkflowEventsSeq []*persistence.WorkflowEvents,
18+
isWorkflow bool,
1819
) (int64, error)
1920

2021
ConflictResolveWorkflowExecution(
@@ -29,6 +30,7 @@ type (
2930
currentWorkflowFailoverVersion *int64,
3031
currentWorkflowMutation *persistence.WorkflowMutation,
3132
currentWorkflowEventsSeq []*persistence.WorkflowEvents,
33+
isWorkflow bool,
3234
) (int64, int64, int64, error)
3335

3436
UpdateWorkflowExecution(
@@ -40,6 +42,7 @@ type (
4042
newWorkflowFailoverVersion *int64,
4143
newWorkflowSnapshot *persistence.WorkflowSnapshot,
4244
newWorkflowEventsSeq []*persistence.WorkflowEvents,
45+
isWorkflow bool,
4346
) (int64, int64, error)
4447

4548
SetWorkflowExecution(

0 commit comments

Comments
 (0)