Skip to content

Commit 5b4dcb9

Browse files
Shivs11ShahabT
andauthored
Versioning Metrics: pt1 (#7822)
## What changed? - adding helpful metrics for queries so that we can understand customer impact Metrics added in this PR are: (Directly related to entity workflows) - WorkerDeploymentCreated - WorkerDeploymentVersionCreated - WorkerDeploymentVersionCreatedManagedByController - WorkerDeploymentVersionVisibilityQueryCount (Not directly related to entity workflows) - WorkflowResetCount - WorkflowQuerySuccessCount - WorkflowQueryTimeoutCount - WorkflowQueryFailureCount - WorkflowTasksCompleted ## Why? - vis ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks - None --------- Co-authored-by: ShahabT <[email protected]>
1 parent 36fd39b commit 5b4dcb9

File tree

11 files changed

+180
-6
lines changed

11 files changed

+180
-6
lines changed

common/metrics/metric_defs.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ const (
321321
HistoryRecordActivityTaskHeartbeatScope = "RecordActivityTaskHeartbeat"
322322
// HistoryRespondWorkflowTaskCompletedScope tracks RespondWorkflowTaskCompleted API calls received by service
323323
HistoryRespondWorkflowTaskCompletedScope = "RespondWorkflowTaskCompleted"
324+
HistoryRespondWorkflowTaskFailedScope = "RespondWorkflowTaskFailed"
324325
// HistoryRespondActivityTaskCompletedScope tracks RespondActivityTaskCompleted API calls received by service
325326
HistoryRespondActivityTaskCompletedScope = "RespondActivityTaskCompleted"
326327
// HistoryRespondActivityTaskFailedScope tracks RespondActivityTaskFailed API calls received by service
@@ -357,6 +358,7 @@ const (
357358
HistoryReapplyEventsScope = "ReapplyEvents"
358359
// HistoryQueryWorkflowScope tracks QueryWorkflow API calls received by service
359360
HistoryQueryWorkflowScope = "QueryWorkflow"
361+
HistoryResetWorkflowScope = "HistoryResetWorkflow"
360362
// HistoryProcessDeleteHistoryEventScope tracks ProcessDeleteHistoryEvent processing calls
361363
HistoryProcessDeleteHistoryEventScope = "ProcessDeleteHistoryEvent"
362364
// HistoryDeleteWorkflowExecutionScope tracks DeleteWorkflowExecutions API calls
@@ -1181,8 +1183,18 @@ var (
11811183
)
11821184

11831185
// Worker Versioning
1184-
WorkerDeploymentVersioningOverrideCounter = NewCounterDef("worker_deployment_versioning_override_count")
1185-
StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count")
1186+
WorkerDeploymentCreated = NewCounterDef("worker_deployment_created")
1187+
WorkerDeploymentVersionCreated = NewCounterDef("worker_deployment_version_created")
1188+
WorkerDeploymentVersionCreatedManagedByController = NewCounterDef("worker_deployment_version_created_managed_by_controller")
1189+
WorkerDeploymentVersionVisibilityQueryCount = NewCounterDef("worker_deployment_version_visibility_query_count")
1190+
WorkerDeploymentVersioningOverrideCounter = NewCounterDef("worker_deployment_versioning_override_count")
1191+
StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count")
1192+
1193+
WorkflowResetCount = NewCounterDef("workflow_reset_count")
1194+
WorkflowQuerySuccessCount = NewCounterDef("workflow_query_success_count")
1195+
WorkflowQueryFailureCount = NewCounterDef("workflow_query_failure_count")
1196+
WorkflowQueryTimeoutCount = NewCounterDef("workflow_query_timeout_count")
1197+
WorkflowTasksCompleted = NewCounterDef("workflow_tasks_completed")
11861198

11871199
// Force replication
11881200
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")

common/metrics/tags.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ const (
5858
trueValue = "true"
5959
errorPrefix = "*"
6060

61+
queryTypeStackTrace = "__stack_trace"
62+
queryTypeOpenSessions = "__open_sessions"
63+
queryTypeWorkflowMetadata = "__temporal_workflow_metadata"
64+
queryTypeUserDefined = "__user_defined"
65+
6166
newRun = "new"
6267
existingRun = "existing"
6368
childRun = "child"
@@ -241,6 +246,14 @@ func FailureTag(value string) Tag {
241246
return &tagImpl{key: FailureTagName, value: value}
242247
}
243248

249+
func FirstAttemptTag(attempt int32) Tag {
250+
value := falseValue
251+
if attempt == 1 {
252+
value = trueValue
253+
}
254+
return &tagImpl{key: isFirstAttempt, value: value}
255+
}
256+
244257
func FailureSourceTag(value string) Tag {
245258
if len(value) == 0 {
246259
value = unknownValue
@@ -402,6 +415,22 @@ func DestinationTag(value string) Tag {
402415
}
403416
}
404417

418+
func VersioningBehaviorTag(behavior enumspb.VersioningBehavior) Tag {
419+
return &tagImpl{versioningBehavior, behavior.String()}
420+
}
421+
422+
func WorkflowStatusTag(status string) Tag {
423+
return &tagImpl{key: workflowStatus, value: status}
424+
}
425+
426+
func QueryTypeTag(queryType string) Tag {
427+
if queryType == queryTypeStackTrace || queryType == queryTypeOpenSessions || queryType == queryTypeWorkflowMetadata {
428+
return &tagImpl{key: queryType, value: queryType}
429+
}
430+
// group all user defined queries into a single tag value
431+
return &tagImpl{key: queryType, value: queryTypeUserDefined}
432+
}
433+
405434
func VersioningBehaviorBeforeOverrideTag(behavior enumspb.VersioningBehavior) Tag {
406435
return &tagImpl{key: behaviorBefore, value: behavior.String()}
407436
}

service/history/api/queryworkflow/api.go

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func Invoke(
154154
return queryDirectlyThroughMatching(
155155
ctx,
156156
msResp,
157+
nsEntry,
157158
request.GetNamespaceId(),
158159
req,
159160
shardContext,
@@ -178,6 +179,12 @@ func Invoke(
178179
}
179180
queryID, completionCh := queryReg.BufferQuery(req.GetQuery())
180181
defer queryReg.RemoveQuery(queryID)
182+
183+
msResp, err := api.MutableStateToGetResponse(mutableState)
184+
if err != nil {
185+
return nil, err
186+
}
187+
181188
workflowLease.GetReleaseFn()(nil) // release the lock - no access to mutable state beyond this point!
182189
select {
183190
case <-completionCh:
@@ -191,13 +198,28 @@ func Invoke(
191198
result := completionState.Result
192199
switch result.GetResultType() {
193200
case enumspb.QUERY_RESULT_TYPE_ANSWERED:
201+
emitWorkflowQueryMetrics(
202+
scope,
203+
nsEntry,
204+
msResp,
205+
req.GetQuery().GetQueryType(),
206+
nil,
207+
)
194208
return &historyservice.QueryWorkflowResponse{
195209
Response: &workflowservice.QueryWorkflowResponse{
196210
QueryResult: result.GetAnswer(),
197211
},
198212
}, nil
199213
case enumspb.QUERY_RESULT_TYPE_FAILED:
200-
return nil, serviceerror.NewQueryFailedWithFailure(result.GetErrorMessage(), result.GetFailure())
214+
err := serviceerror.NewQueryFailedWithFailure(result.GetErrorMessage(), result.GetFailure())
215+
emitWorkflowQueryMetrics(
216+
scope,
217+
nsEntry,
218+
msResp,
219+
req.GetQuery().GetQueryType(),
220+
err,
221+
)
222+
return nil, err
201223
default:
202224
metrics.QueryRegistryInvalidStateCount.With(scope).Record(1)
203225
return nil, consts.ErrQueryEnteredInvalidState
@@ -211,6 +233,7 @@ func Invoke(
211233
return queryDirectlyThroughMatching(
212234
ctx,
213235
msResp,
236+
nsEntry,
214237
request.GetNamespaceId(),
215238
req,
216239
shardContext,
@@ -221,12 +244,27 @@ func Invoke(
221244
priority,
222245
)
223246
case workflow.QueryCompletionTypeFailed:
224-
return nil, completionState.Err
247+
err = completionState.Err
248+
emitWorkflowQueryMetrics(
249+
scope,
250+
nsEntry,
251+
msResp,
252+
req.GetQuery().GetQueryType(),
253+
err,
254+
)
255+
return nil, err
225256
default:
226257
metrics.QueryRegistryInvalidStateCount.With(scope).Record(1)
227258
return nil, consts.ErrQueryEnteredInvalidState
228259
}
229260
case <-ctx.Done():
261+
emitWorkflowQueryMetrics(
262+
scope,
263+
nsEntry,
264+
msResp,
265+
req.GetQuery().GetQueryType(),
266+
ctx.Err(),
267+
)
230268
metrics.ConsistentQueryTimeoutCount.With(scope).Record(1)
231269
return nil, ctx.Err()
232270
}
@@ -259,6 +297,7 @@ func queryWillTimeoutsBeforeFirstWorkflowTaskStart(
259297
func queryDirectlyThroughMatching(
260298
ctx context.Context,
261299
msResp *historyservice.GetMutableStateResponse,
300+
nsEntry *namespace.Namespace,
262301
namespaceID string,
263302
queryRequest *workflowservice.QueryWorkflowRequest,
264303
shard historyi.ShardContext,
@@ -267,11 +306,18 @@ func queryDirectlyThroughMatching(
267306
matchingClient matchingservice.MatchingServiceClient,
268307
metricsHandler metrics.Handler,
269308
priority *commonpb.Priority,
270-
) (*historyservice.QueryWorkflowResponse, error) {
309+
) (resp *historyservice.QueryWorkflowResponse, retError error) {
271310

272311
startTime := time.Now().UTC()
273312
defer func() {
274313
metrics.DirectQueryDispatchLatency.With(metricsHandler).Record(time.Since(startTime))
314+
emitWorkflowQueryMetrics(
315+
metricsHandler,
316+
nsEntry,
317+
msResp,
318+
queryRequest.GetQuery().GetQueryType(),
319+
retError,
320+
)
275321
}()
276322

277323
directive := worker_versioning.MakeDirectiveForWorkflowTask(
@@ -355,3 +401,27 @@ func queryDirectlyThroughMatching(
355401
QueryRejected: matchingResp.GetQueryRejected(),
356402
}}, err
357403
}
404+
405+
func emitWorkflowQueryMetrics(
406+
metricsHandler metrics.Handler,
407+
nsEntry *namespace.Namespace,
408+
msResp *historyservice.GetMutableStateResponse,
409+
queryType string,
410+
err error,
411+
) {
412+
commonTags := []metrics.Tag{
413+
metrics.OperationTag(metrics.HistoryQueryWorkflowScope),
414+
metrics.NamespaceTag(nsEntry.Name().String()),
415+
metrics.VersioningBehaviorTag(workflow.GetEffectiveVersioningBehavior(msResp.GetVersioningInfo())),
416+
metrics.WorkflowStatusTag(msResp.GetWorkflowStatus().String()),
417+
metrics.QueryTypeTag(queryType),
418+
}
419+
420+
if err == nil {
421+
metrics.WorkflowQuerySuccessCount.With(metricsHandler).Record(1, commonTags...)
422+
} else if common.IsContextDeadlineExceededErr(err) {
423+
metrics.WorkflowQueryTimeoutCount.With(metricsHandler).Record(1, commonTags...)
424+
} else {
425+
metrics.WorkflowQueryFailureCount.With(metricsHandler).Record(1, commonTags...)
426+
}
427+
}

service/history/api/resetworkflow/api.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.temporal.io/server/common/definition"
1313
"go.temporal.io/server/common/locks"
1414
"go.temporal.io/server/common/log/tag"
15+
"go.temporal.io/server/common/metrics"
1516
"go.temporal.io/server/common/namespace"
1617
"go.temporal.io/server/common/persistence/versionhistory"
1718
"go.temporal.io/server/common/worker_versioning"
@@ -130,6 +131,15 @@ func Invoke(
130131
if err != nil {
131132
return nil, err
132133
}
134+
135+
metrics.WorkflowResetCount.With(
136+
shardContext.GetMetricsHandler().WithTags(
137+
metrics.NamespaceTag(namespaceEntry.Name().String()),
138+
metrics.OperationTag(metrics.HistoryResetWorkflowScope),
139+
metrics.VersioningBehaviorTag(baseMutableState.GetEffectiveVersioningBehavior()),
140+
),
141+
).Record(1)
142+
133143
allowResetWithPendingChildren := shardContext.GetConfig().AllowResetWithPendingChildren(namespaceEntry.Name().String())
134144
if err := ndc.NewWorkflowResetter(
135145
shardContext,

service/history/api/respondworkflowtaskcompleted/api.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,12 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
431431

432432
metrics.FailedWorkflowTasksCounter.With(handler.metricsHandler).Record(
433433
1,
434-
metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope))
434+
metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope),
435+
metrics.NamespaceTag(namespaceEntry.Name().String()),
436+
metrics.VersioningBehaviorTag(ms.GetEffectiveVersioningBehavior()),
437+
metrics.FailureTag(wtFailedCause.failedCause.String()),
438+
metrics.FirstAttemptTag(currentWorkflowTask.Attempt),
439+
)
435440
handler.logger.Info("Failing the workflow task.",
436441
tag.Value(wtFailedCause.Message()),
437442
tag.WorkflowID(token.GetWorkflowId()),

service/history/api/respondworkflowtaskfailed/api.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"go.temporal.io/server/api/historyservice/v1"
88
"go.temporal.io/server/common"
99
"go.temporal.io/server/common/definition"
10+
"go.temporal.io/server/common/metrics"
1011
"go.temporal.io/server/common/namespace"
1112
"go.temporal.io/server/common/tasktoken"
1213
"go.temporal.io/server/service/history/api"
@@ -41,6 +42,11 @@ func Invoke(
4142
token.RunId,
4243
),
4344
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
45+
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()))
46+
if err != nil {
47+
return nil, err
48+
}
49+
4450
mutableState := workflowLease.GetMutableState()
4551
if !mutableState.IsWorkflowExecutionRunning() {
4652
return nil, consts.ErrWorkflowCompleted
@@ -73,6 +79,15 @@ func Invoke(
7379
return nil, err
7480
}
7581

82+
metrics.FailedWorkflowTasksCounter.With(shardContext.GetMetricsHandler()).Record(
83+
1,
84+
metrics.OperationTag(metrics.HistoryRespondWorkflowTaskFailedScope),
85+
metrics.NamespaceTag(namespaceEntry.Name().String()),
86+
metrics.VersioningBehaviorTag(mutableState.GetEffectiveVersioningBehavior()),
87+
metrics.FailureTag(request.GetCause().String()),
88+
metrics.FirstAttemptTag(workflowTask.Attempt),
89+
)
90+
7691
// TODO (alex-update): if it was speculative WT that failed, and there is nothing but pending updates,
7792
// new WT also should be create as speculative (or not?). Currently, it will be recreated as normal WT.
7893
return &api.UpdateWorkflowAction{

service/history/timer_queue_active_task_executor.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,8 @@ func (t *timerQueueActiveTaskExecutor) processSingleActivityTimeoutTask(
312312
namespace.ID(mutableState.GetExecutionInfo().NamespaceId),
313313
metrics.TimerActiveTaskActivityTimeoutScope,
314314
timerSequenceID.TimerType,
315+
mutableState.GetEffectiveVersioningBehavior(),
316+
ai.Attempt,
315317
)
316318
if _, err = mutableState.AddActivityTaskTimedOutEvent(
317319
ai.ScheduledEventId,
@@ -380,6 +382,8 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTaskTimeoutTask(
380382
namespace.ID(mutableState.GetExecutionInfo().NamespaceId),
381383
operationMetricsTag,
382384
enumspb.TIMEOUT_TYPE_START_TO_CLOSE,
385+
mutableState.GetEffectiveVersioningBehavior(),
386+
workflowTask.Attempt,
383387
)
384388
if _, err := mutableState.AddWorkflowTaskTimedOutEvent(
385389
workflowTask,
@@ -398,6 +402,8 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTaskTimeoutTask(
398402
namespace.ID(mutableState.GetExecutionInfo().NamespaceId),
399403
operationMetricsTag,
400404
enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START,
405+
mutableState.GetEffectiveVersioningBehavior(),
406+
workflowTask.Attempt,
401407
)
402408
_, err := mutableState.AddWorkflowTaskScheduleToStartTimeoutEvent(workflowTask)
403409
if err != nil {
@@ -856,6 +862,8 @@ func (t *timerQueueActiveTaskExecutor) emitTimeoutMetricScopeWithNamespaceTag(
856862
namespaceID namespace.ID,
857863
operation string,
858864
timerType enumspb.TimeoutType,
865+
effectiveVersioningBehavior enumspb.VersioningBehavior,
866+
taskAttempt int32,
859867
) {
860868
namespaceEntry, err := t.registry.GetNamespaceByID(namespaceID)
861869
if err != nil {
@@ -864,6 +872,8 @@ func (t *timerQueueActiveTaskExecutor) emitTimeoutMetricScopeWithNamespaceTag(
864872
metricsScope := t.metricsHandler.WithTags(
865873
metrics.OperationTag(operation),
866874
metrics.NamespaceTag(namespaceEntry.Name().String()),
875+
metrics.VersioningBehaviorTag(effectiveVersioningBehavior),
876+
metrics.FirstAttemptTag(taskAttempt),
867877
)
868878
switch timerType {
869879
case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START:

service/history/timer_queue_active_task_executor_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2233,6 +2233,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessSingleActivityTimeoutTask
22332233
ms.EXPECT().GetExecutionInfo().Return(info).AnyTimes()
22342234
ms.EXPECT().AddActivityTaskTimedOutEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
22352235
}
2236+
ms.EXPECT().GetEffectiveVersioningBehavior().Return(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED).AnyTimes()
22362237

22372238
result, err := s.timerQueueActiveTaskExecutor.processSingleActivityTimeoutTask(
22382239
ms, tc.timerSequenceID, tc.ai)

service/history/workflow/workflow_task_state_machine.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,13 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent(
745745
if err != nil {
746746
return nil, err
747747
}
748+
749+
metrics.WorkflowTasksCompleted.With(m.metricsHandler).Record(1,
750+
metrics.NamespaceTag(m.ms.GetNamespaceEntry().Name().String()),
751+
metrics.VersioningBehaviorTag(vb),
752+
metrics.FirstAttemptTag(workflowTask.Attempt),
753+
)
754+
748755
return event, nil
749756
}
750757

0 commit comments

Comments
 (0)