Skip to content

Commit f620ac4

Browse files
committed
Produce workflow_duration metric on completion
1 parent da90f62 commit f620ac4

File tree

4 files changed

+88
-12
lines changed

4 files changed

+88
-12
lines changed

common/metrics/metric_defs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,7 @@ var (
926926
WorkflowTimeoutCount = NewCounterDef("workflow_timeout")
927927
WorkflowTerminateCount = NewCounterDef("workflow_terminate")
928928
WorkflowContinuedAsNewCount = NewCounterDef("workflow_continued_as_new")
929+
WorkflowDuration = NewTimerDef("workflow_duration")
929930
ReplicationStreamPanic = NewCounterDef("replication_stream_panic")
930931
ReplicationStreamError = NewCounterDef("replication_stream_error")
931932
ReplicationServiceError = NewCounterDef("replication_service_error")

service/history/workflow/metrics.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,26 @@ func emitMutableStateStatus(
7777
func emitWorkflowCompletionStats(
7878
metricsHandler metrics.Handler,
7979
namespace namespace.Name,
80-
namespaceState string,
81-
taskQueue string,
82-
workflowTypeName string,
83-
status enumspb.WorkflowExecutionStatus,
80+
completion completionMetric,
8481
config *configs.Config,
8582
) {
86-
handler := GetPerTaskQueueFamilyScope(metricsHandler, namespace, taskQueue, config,
83+
handler := GetPerTaskQueueFamilyScope(metricsHandler, namespace, completion.taskQueue, config,
8784
metrics.OperationTag(metrics.WorkflowCompletionStatsScope),
88-
metrics.NamespaceStateTag(namespaceState),
89-
metrics.WorkflowTypeTag(workflowTypeName),
85+
metrics.NamespaceStateTag(completion.namespaceState),
86+
metrics.WorkflowTypeTag(completion.workflowTypeName),
9087
)
9188

92-
switch status {
89+
if completion.state == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED &&
90+
completion.executionTime != nil && completion.closeTime != nil {
91+
92+
executionTime := completion.executionTime.AsTime()
93+
closeTime := completion.closeTime.AsTime()
94+
if closeTime.After(executionTime) {
95+
metrics.WorkflowDuration.With(handler).Record(closeTime.Sub(executionTime))
96+
}
97+
}
98+
99+
switch completion.status {
93100
case enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED:
94101
metrics.WorkflowSuccessCount.With(handler).Record(1)
95102
case enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED:
@@ -102,6 +109,8 @@ func emitWorkflowCompletionStats(
102109
metrics.WorkflowTerminateCount.With(handler).Record(1)
103110
case enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
104111
metrics.WorkflowContinuedAsNewCount.With(handler).Record(1)
112+
case enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING:
113+
// skip, the workflow is not completed
105114
}
106115
}
107116

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package workflow
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
enumspb "go.temporal.io/api/enums/v1"
9+
enumsspb "go.temporal.io/server/api/enums/v1"
10+
"go.temporal.io/server/common/dynamicconfig"
11+
"go.temporal.io/server/common/log"
12+
"go.temporal.io/server/common/metrics"
13+
"go.temporal.io/server/common/metrics/metricstest"
14+
"go.temporal.io/server/common/namespace"
15+
"go.temporal.io/server/service/history/configs"
16+
"google.golang.org/protobuf/types/known/timestamppb"
17+
)
18+
19+
func TestEmitWorkflowCompletionStats_WorkflowDuration(t *testing.T) {
20+
logger := log.NewTestLogger()
21+
testHandler, err := metricstest.NewHandler(logger, metrics.ClientConfig{})
22+
require.NoError(t, err)
23+
testNamespace := namespace.Name("test-namespace")
24+
config := &configs.Config{
25+
BreakdownMetricsByTaskQueue: dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(true),
26+
}
27+
28+
completionMetric := completionMetric{
29+
initialized: true,
30+
taskQueue: "test-task-queue",
31+
namespaceState: "active",
32+
workflowTypeName: "test-workflow",
33+
status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
34+
state: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
35+
executionTime: timestamppb.New(time.Unix(100, 0)),
36+
closeTime: timestamppb.New(time.Unix(130, 0)),
37+
}
38+
39+
emitWorkflowCompletionStats(testHandler, testNamespace, completionMetric, config)
40+
41+
snapshot, err := testHandler.Snapshot()
42+
require.NoError(t, err)
43+
buckets, err := snapshot.Histogram("workflow_duration_milliseconds",
44+
45+
metrics.StringTag("namespace", "test-namespace"),
46+
metrics.StringTag("namespace_state", "active"),
47+
metrics.StringTag("workflowType", "test-workflow"),
48+
metrics.StringTag("operation", "CompletionStats"),
49+
metrics.StringTag("taskqueue", "test-task-queue"),
50+
metrics.StringTag("otel_scope_name", "temporal"),
51+
metrics.StringTag("otel_scope_version", ""),
52+
)
53+
require.NoError(t, err)
54+
require.NotEmpty(t, buckets)
55+
}

service/history/workflow/transaction_impl.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
commonpb "go.temporal.io/api/common/v1"
77
enumspb "go.temporal.io/api/enums/v1"
88
"go.temporal.io/api/serviceerror"
9+
enumsspb "go.temporal.io/server/api/enums/v1"
910
"go.temporal.io/server/common"
1011
"go.temporal.io/server/common/log"
1112
"go.temporal.io/server/common/log/tag"
@@ -14,6 +15,7 @@ import (
1415
"go.temporal.io/server/common/persistence"
1516
"go.temporal.io/server/service/history/events"
1617
historyi "go.temporal.io/server/service/history/interfaces"
18+
"google.golang.org/protobuf/types/known/timestamppb"
1719
)
1820

1921
type (
@@ -23,6 +25,9 @@ type (
2325
namespaceState string
2426
workflowTypeName string
2527
status enumspb.WorkflowExecutionStatus
28+
state enumsspb.WorkflowExecutionState
29+
executionTime *timestamppb.Timestamp
30+
closeTime *timestamppb.Timestamp
2631
}
2732
TransactionImpl struct {
2833
shard historyi.ShardContext
@@ -680,12 +685,16 @@ func snapshotToCompletionMetric(
680685
if workflowSnapshot == nil {
681686
return completionMetric{initialized: false}
682687
}
688+
683689
return completionMetric{
684690
initialized: true,
685691
taskQueue: workflowSnapshot.ExecutionInfo.TaskQueue,
686692
namespaceState: namespaceState,
687693
workflowTypeName: workflowSnapshot.ExecutionInfo.WorkflowTypeName,
688694
status: workflowSnapshot.ExecutionState.Status,
695+
state: workflowSnapshot.ExecutionState.State,
696+
executionTime: workflowSnapshot.ExecutionInfo.ExecutionTime,
697+
closeTime: workflowSnapshot.ExecutionInfo.CloseTime,
689698
}
690699
}
691700

@@ -696,12 +705,16 @@ func mutationToCompletionMetric(
696705
if workflowMutation == nil {
697706
return completionMetric{initialized: false}
698707
}
708+
699709
return completionMetric{
700710
initialized: true,
701711
taskQueue: workflowMutation.ExecutionInfo.TaskQueue,
702712
namespaceState: namespaceState,
703713
workflowTypeName: workflowMutation.ExecutionInfo.WorkflowTypeName,
704714
status: workflowMutation.ExecutionState.Status,
715+
state: workflowMutation.ExecutionState.State,
716+
executionTime: workflowMutation.ExecutionInfo.ExecutionTime,
717+
closeTime: workflowMutation.ExecutionInfo.CloseTime,
705718
}
706719
}
707720

@@ -717,13 +730,11 @@ func emitCompletionMetrics(
717730
if !completionMetric.initialized {
718731
continue
719732
}
733+
720734
emitWorkflowCompletionStats(
721735
metricsHandler,
722736
namespaceName,
723-
completionMetric.namespaceState,
724-
completionMetric.taskQueue,
725-
completionMetric.workflowTypeName,
726-
completionMetric.status,
737+
completionMetric,
727738
shardContext.GetConfig(),
728739
)
729740
}

0 commit comments

Comments
 (0)