Skip to content

Commit 36fd39b

Browse files
authored
Versioning Metrics pt2: DeploymentTransition + VersioningOverride (#7825)
## What changed? - metrics for DeploymentTransition + VersioningOverride ## Why? - versioning-0.32 ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks - a little new to this side of the code involving versioning so a bit scared about doing things wrong, hopefully I am not
1 parent 52793d9 commit 36fd39b

File tree

4 files changed

+128
-8
lines changed

4 files changed

+128
-8
lines changed

common/metrics/metric_defs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,10 @@ var (
11801180
WithDescription("The number of schedule actions that failed to start"),
11811181
)
11821182

1183+
// Worker Versioning
1184+
WorkerDeploymentVersioningOverrideCounter = NewCounterDef("worker_deployment_versioning_override_count")
1185+
StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count")
1186+
11831187
// Force replication
11841188
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
11851189
EncounterNotFoundWorkflowCount = NewCounterDef("encounter_not_found_workflow_count")

common/metrics/tags.go

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strings"
77

88
enumspb "go.temporal.io/api/enums/v1"
9+
historypb "go.temporal.io/api/history/v1"
910
enumsspb "go.temporal.io/server/api/enums/v1"
1011
"go.temporal.io/server/common/locks"
1112
"go.temporal.io/server/common/primitives"
@@ -40,13 +41,30 @@ const (
4041
// See server.api.enums.v1.ReplicationTaskType
4142
replicationTaskType = "replicationTaskType"
4243
replicationTaskPriority = "replicationTaskPriority"
43-
44-
namespaceAllValue = "all"
45-
unknownValue = "_unknown_"
46-
totalMetricSuffix = "_total"
47-
tagExcludedValue = "_tag_excluded_"
48-
49-
errorPrefix = "*"
44+
versioningBehavior = "versioning_behavior"
45+
isFirstAttempt = "first-attempt"
46+
workflowStatus = "workflow_status"
47+
behaviorBefore = "behavior_before"
48+
behaviorAfter = "behavior_after"
49+
runInitiator = "run_initiator"
50+
fromUnversioned = "from_unversioned"
51+
toUnversioned = "to_unversioned"
52+
queryType = "query_type"
53+
namespaceAllValue = "all"
54+
unknownValue = "_unknown_"
55+
totalMetricSuffix = "_total"
56+
tagExcludedValue = "_tag_excluded_"
57+
falseValue = "false"
58+
trueValue = "true"
59+
errorPrefix = "*"
60+
61+
newRun = "new"
62+
existingRun = "existing"
63+
childRun = "child"
64+
canRun = "can"
65+
retryRun = "retry"
66+
cronRun = "cron"
67+
unknownRun = "unknown"
5068
)
5169

5270
// Tag is an interface to define metrics tags
@@ -383,3 +401,50 @@ func DestinationTag(value string) Tag {
383401
value: value,
384402
}
385403
}
404+
405+
func VersioningBehaviorBeforeOverrideTag(behavior enumspb.VersioningBehavior) Tag {
406+
return &tagImpl{key: behaviorBefore, value: behavior.String()}
407+
}
408+
409+
func VersioningBehaviorAfterOverrideTag(behavior enumspb.VersioningBehavior) Tag {
410+
return &tagImpl{key: behaviorAfter, value: behavior.String()}
411+
}
412+
413+
// RunInitiatorTag creates a tag indicating how a workflow run was initiated.
414+
// It handles both new workflow runs and continuations from previous runs.
415+
// When attributes is nil (e.g. during AddWorkflowExecutionOptionsUpdatedEvent),
416+
// it returns a tag indicating an existing run.
417+
func RunInitiatorTag(prevRunID string, attributes *historypb.WorkflowExecutionStartedEventAttributes) Tag {
418+
if attributes == nil {
419+
return &tagImpl{key: runInitiator, value: existingRun}
420+
} else if attributes.GetParentWorkflowExecution() != nil {
421+
return &tagImpl{key: runInitiator, value: childRun}
422+
}
423+
424+
switch attributes.GetInitiator() {
425+
case enumspb.CONTINUE_AS_NEW_INITIATOR_UNSPECIFIED:
426+
return &tagImpl{key: runInitiator, value: newRun}
427+
case enumspb.CONTINUE_AS_NEW_INITIATOR_WORKFLOW:
428+
return &tagImpl{key: runInitiator, value: canRun}
429+
case enumspb.CONTINUE_AS_NEW_INITIATOR_RETRY:
430+
return &tagImpl{key: runInitiator, value: retryRun}
431+
case enumspb.CONTINUE_AS_NEW_INITIATOR_CRON_SCHEDULE:
432+
return &tagImpl{key: runInitiator, value: cronRun}
433+
default:
434+
return &tagImpl{key: runInitiator, value: unknownRun}
435+
}
436+
}
437+
438+
func FromUnversionedTag(version string) Tag {
439+
if version == "_unversioned_" {
440+
return &tagImpl{key: fromUnversioned, value: trueValue}
441+
}
442+
return &tagImpl{key: fromUnversioned, value: falseValue}
443+
}
444+
445+
func ToUnversionedTag(version string) Tag {
446+
if version == "_unversioned_" {
447+
return &tagImpl{key: toUnversioned, value: trueValue}
448+
}
449+
return &tagImpl{key: toUnversioned, value: falseValue}
450+
}

common/worker_versioning/worker_versioning.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,17 @@ func OverrideIsPinned(override *workflowpb.VersioningOverride) bool {
375375
override.GetPinned().GetBehavior() == workflowpb.VersioningOverride_PINNED_OVERRIDE_BEHAVIOR_PINNED
376376
}
377377

378+
func ExtractVersioningBehaviorFromOverride(override *workflowpb.VersioningOverride) enumspb.VersioningBehavior {
379+
if override.GetAutoUpgrade() {
380+
return enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE
381+
} else if override.GetPinned() != nil {
382+
return enumspb.VERSIONING_BEHAVIOR_PINNED
383+
}
384+
385+
//nolint:staticcheck // SA1019: worker versioning v0.31
386+
return override.GetBehavior()
387+
}
388+
378389
func ValidateVersioningOverride(override *workflowpb.VersioningOverride) error {
379390
if override == nil {
380391
return nil

service/history/workflow/mutable_state_impl.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2463,6 +2463,18 @@ func (ms *MutableStateImpl) AddWorkflowExecutionStartedEventWithOptions(
24632463
); err != nil {
24642464
return nil, err
24652465
}
2466+
2467+
// Versioning Override set on StartWorkflowExecutionRequest
2468+
if startRequest.GetStartRequest().GetVersioningOverride() != nil {
2469+
metrics.WorkerDeploymentVersioningOverrideCounter.With(
2470+
ms.metricsHandler.WithTags(
2471+
metrics.NamespaceTag(ms.namespaceEntry.Name().String()),
2472+
metrics.VersioningBehaviorBeforeOverrideTag(enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED),
2473+
metrics.VersioningBehaviorAfterOverrideTag(worker_versioning.ExtractVersioningBehaviorFromOverride(startRequest.GetStartRequest().GetVersioningOverride())),
2474+
metrics.RunInitiatorTag(prevRunID, event.GetWorkflowExecutionStartedEventAttributes()),
2475+
),
2476+
).Record(1)
2477+
}
24662478
return event, nil
24672479
}
24682480

@@ -4804,9 +4816,25 @@ func (ms *MutableStateImpl) AddWorkflowExecutionOptionsUpdatedEvent(
48044816
attachCompletionCallbacks,
48054817
links,
48064818
)
4819+
prevEffectiveVersioningBehavior := ms.GetEffectiveVersioningBehavior()
4820+
prevEffectiveDeployment := ms.GetEffectiveDeployment()
4821+
48074822
if err := ms.ApplyWorkflowExecutionOptionsUpdatedEvent(event); err != nil {
48084823
return nil, err
48094824
}
4825+
4826+
if !proto.Equal(ms.GetEffectiveDeployment(), prevEffectiveDeployment) ||
4827+
ms.GetEffectiveVersioningBehavior() != prevEffectiveVersioningBehavior {
4828+
metrics.WorkerDeploymentVersioningOverrideCounter.With(
4829+
ms.metricsHandler.WithTags(
4830+
metrics.NamespaceTag(ms.namespaceEntry.Name().String()),
4831+
metrics.VersioningBehaviorBeforeOverrideTag(prevEffectiveVersioningBehavior),
4832+
metrics.VersioningBehaviorAfterOverrideTag(ms.GetEffectiveVersioningBehavior()),
4833+
metrics.RunInitiatorTag("", event.GetWorkflowExecutionStartedEventAttributes()),
4834+
),
4835+
).Record(1)
4836+
}
4837+
48104838
return event, nil
48114839
}
48124840

@@ -4833,6 +4861,7 @@ func (ms *MutableStateImpl) ApplyWorkflowExecutionOptionsUpdatedEvent(event *his
48334861
return err
48344862
}
48354863
}
4864+
48364865
return nil
48374866
}
48384867

@@ -8087,7 +8116,8 @@ func (ms *MutableStateImpl) StartDeploymentTransition(deployment *deploymentpb.D
80878116
ms.GetExecutionInfo().VersioningInfo = versioningInfo
80888117
}
80898118

8090-
if ms.GetEffectiveDeployment().Equal(deployment) {
8119+
preTransitionEffectiveDeployment := ms.GetEffectiveDeployment()
8120+
if preTransitionEffectiveDeployment.Equal(deployment) {
80918121
return serviceerror.NewInternal("start transition should receive a version different from effective version")
80928122
}
80938123

@@ -8125,6 +8155,16 @@ func (ms *MutableStateImpl) StartDeploymentTransition(deployment *deploymentpb.D
81258155
ms.logInfo("start transition did not reschedule pending speculative task")
81268156
}
81278157
}
8158+
8159+
// DeploymentTransition has taken place, so we increment the DeploymentTransition metric
8160+
metrics.StartDeploymentTransitionCounter.With(
8161+
ms.metricsHandler.WithTags(
8162+
metrics.NamespaceTag(ms.namespaceEntry.Name().String()),
8163+
metrics.FromUnversionedTag(worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(preTransitionEffectiveDeployment))),
8164+
metrics.ToUnversionedTag(worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(deployment))),
8165+
),
8166+
).Record(1)
8167+
81288168
return nil
81298169
}
81308170

0 commit comments

Comments
 (0)