Skip to content

Commit cd386e2

Browse files
committed
Add propagation latency metrics
1 parent 28bf8ca commit cd386e2

File tree

5 files changed

+61
-30
lines changed

5 files changed

+61
-30
lines changed

common/metrics/metric_defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,6 +1281,8 @@ var (
12811281
WorkerDeploymentVersionVisibilityQueryCount = NewCounterDef("worker_deployment_version_visibility_query_count")
12821282
WorkerDeploymentVersioningOverrideCounter = NewCounterDef("worker_deployment_versioning_override_count")
12831283
StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count")
1284+
VersioningDataPropagationLatency = NewTimerDef("versioning_data_propagation_latency")
1285+
SlowVersioningDataPropagationCounter = NewCounterDef("slow_versioning_data_propagation")
12841286

12851287
WorkflowResetCount = NewCounterDef("workflow_reset_count")
12861288
WorkflowQuerySuccessCount = NewCounterDef("workflow_query_success_count")

service/worker/workerdeployment/activities.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,18 @@ import (
1010
deploymentspb "go.temporal.io/server/api/deployment/v1"
1111
"go.temporal.io/server/api/matchingservice/v1"
1212
"go.temporal.io/server/common/namespace"
13-
"go.temporal.io/server/common/resource"
1413
)
1514

1615
type (
1716
Activities struct {
18-
namespace *namespace.Namespace
19-
deploymentClient Client
20-
matchingClient resource.MatchingClient
17+
activityDeps
18+
namespace *namespace.Namespace
2119
}
2220
)
2321

2422
func (a *Activities) SyncWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.SyncVersionStateActivityArgs) (*deploymentspb.SyncVersionStateActivityResult, error) {
2523
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
26-
res, err := a.deploymentClient.SyncVersionWorkflowFromWorkerDeployment(
24+
res, err := a.WorkerDeploymentClient.SyncVersionWorkflowFromWorkerDeployment(
2725
ctx,
2826
a.namespace,
2927
args.DeploymentName,
@@ -48,7 +46,7 @@ func (a *Activities) SyncUnversionedRamp(
4846
) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error) {
4947
logger := activity.GetLogger(ctx)
5048
// Get all the task queues in the current version and put them into SyncUserData format
51-
currVersionInfo, _, err := a.deploymentClient.DescribeVersion(ctx, a.namespace, input.CurrentVersion, false)
49+
currVersionInfo, _, err := a.WorkerDeploymentClient.DescribeVersion(ctx, a.namespace, input.CurrentVersion, false)
5250
if err != nil {
5351
return nil, err
5452
}
@@ -76,7 +74,7 @@ func (a *Activities) SyncUnversionedRamp(
7674
logger.Info("syncing unversioned ramp to task queue userdata", "taskQueue", syncData.Name, "types", syncData.Types)
7775
var res *matchingservice.SyncDeploymentUserDataResponse
7876
var err error
79-
res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
77+
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
8078
NamespaceId: a.namespace.ID().String(),
8179
TaskQueue: syncData.Name,
8280
TaskQueueTypes: syncData.Types,
@@ -107,7 +105,7 @@ func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context
107105
for n, v := range input.TaskQueueMaxVersions {
108106
go func(name string, version int64) {
109107
logger.Info("waiting for unversioned ramp userdata propagation", "taskQueue", name, "version", version)
110-
_, err := a.matchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
108+
_, err := a.MatchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
111109
NamespaceId: a.namespace.ID().String(),
112110
TaskQueue: name,
113111
Version: version,
@@ -126,7 +124,7 @@ func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context
126124
}
127125

128126
func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deploymentspb.IsVersionMissingTaskQueuesArgs) (*deploymentspb.IsVersionMissingTaskQueuesResult, error) {
129-
res, err := a.deploymentClient.IsVersionMissingTaskQueues(
127+
res, err := a.WorkerDeploymentClient.IsVersionMissingTaskQueues(
130128
ctx,
131129
a.namespace,
132130
args.PrevCurrentVersion,
@@ -142,7 +140,7 @@ func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deplo
142140

143141
func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error {
144142
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
145-
err := a.deploymentClient.DeleteVersionFromWorkerDeployment(
143+
err := a.WorkerDeploymentClient.DeleteVersionFromWorkerDeployment(
146144
ctx,
147145
a.namespace,
148146
args.DeploymentName,
@@ -160,7 +158,7 @@ func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *de
160158

161159
func (a *Activities) RegisterWorkerInVersion(ctx context.Context, args *deploymentspb.RegisterWorkerInVersionArgs) error {
162160
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
163-
err := a.deploymentClient.RegisterWorkerInVersion(
161+
err := a.WorkerDeploymentClient.RegisterWorkerInVersion(
164162
ctx,
165163
a.namespace,
166164
args,
@@ -173,7 +171,7 @@ func (a *Activities) RegisterWorkerInVersion(ctx context.Context, args *deployme
173171
}
174172

175173
func (a *Activities) DescribeVersionFromWorkerDeployment(ctx context.Context, args *deploymentspb.DescribeVersionFromWorkerDeploymentActivityArgs) (*deploymentspb.DescribeVersionFromWorkerDeploymentActivityResult, error) {
176-
res, _, err := a.deploymentClient.DescribeVersion(ctx, a.namespace, args.Version, false)
174+
res, _, err := a.WorkerDeploymentClient.DescribeVersion(ctx, a.namespace, args.Version, false)
177175
if err != nil {
178176
return nil, err
179177
}
@@ -201,7 +199,7 @@ func (a *Activities) SyncDeploymentVersionUserDataFromWorkerDeployment(
201199
var err error
202200

203201
if input.ForgetVersion {
204-
res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
202+
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
205203
NamespaceId: a.namespace.ID().String(),
206204
DeploymentName: input.GetDeploymentName(),
207205
TaskQueue: syncData.Name,
@@ -211,7 +209,7 @@ func (a *Activities) SyncDeploymentVersionUserDataFromWorkerDeployment(
211209
},
212210
})
213211
} else {
214-
res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
212+
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
215213
NamespaceId: a.namespace.ID().String(),
216214
DeploymentName: input.GetDeploymentName(),
217215
TaskQueue: syncData.Name,
@@ -250,5 +248,5 @@ func (a *Activities) StartWorkerDeploymentVersionWorkflow(
250248
logger := activity.GetLogger(ctx)
251249
logger.Info("starting worker deployment version workflow", "deploymentName", input.DeploymentName, "buildID", input.BuildId)
252250
identity := "deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
253-
return a.deploymentClient.StartWorkerDeploymentVersion(ctx, a.namespace, input.DeploymentName, input.BuildId, identity, input.RequestId)
251+
return a.WorkerDeploymentClient.StartWorkerDeploymentVersion(ctx, a.namespace, input.DeploymentName, input.BuildId, identity, input.RequestId)
254252
}

service/worker/workerdeployment/fx.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,14 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na
119119
registry.RegisterWorkflowWithOptions(deploymentWorkflow, workflow.RegisterOptions{Name: WorkerDeploymentWorkflowType})
120120

121121
versionActivities := &VersionActivities{
122-
namespace: ns,
123-
deploymentClient: s.activityDeps.WorkerDeploymentClient,
124-
matchingClient: s.activityDeps.MatchingClient,
122+
activityDeps: s.activityDeps,
123+
namespace: ns,
125124
}
126125
registry.RegisterActivity(versionActivities)
127126

128127
activities := &Activities{
129-
namespace: ns,
130-
deploymentClient: s.activityDeps.WorkerDeploymentClient,
131-
matchingClient: s.activityDeps.MatchingClient,
128+
activityDeps: s.activityDeps,
129+
namespace: ns,
132130
}
133131
registry.RegisterActivity(activities)
134132
return nil

service/worker/workerdeployment/version_activities.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"sync"
9+
"time"
910

1011
deploymentpb "go.temporal.io/api/deployment/v1"
1112
enumspb "go.temporal.io/api/enums/v1"
@@ -16,17 +17,19 @@ import (
1617
"go.temporal.io/sdk/temporal"
1718
deploymentspb "go.temporal.io/server/api/deployment/v1"
1819
"go.temporal.io/server/api/matchingservice/v1"
20+
"go.temporal.io/server/common/log/tag"
21+
"go.temporal.io/server/common/metrics"
1922
"go.temporal.io/server/common/namespace"
20-
"go.temporal.io/server/common/resource"
2123
"go.temporal.io/server/common/worker_versioning"
2224
"google.golang.org/protobuf/types/known/timestamppb"
2325
)
2426

27+
const SlowPropagationDelay = 10 * time.Second
28+
2529
type (
2630
VersionActivities struct {
27-
namespace *namespace.Namespace
28-
deploymentClient Client
29-
matchingClient resource.MatchingClient
31+
activityDeps
32+
namespace *namespace.Namespace
3033
}
3134
)
3235

@@ -37,7 +40,7 @@ func (a *VersionActivities) StartWorkerDeploymentWorkflow(
3740
logger := activity.GetLogger(ctx)
3841
logger.Info("starting worker-deployment workflow", "deploymentName", input.DeploymentName)
3942
identity := "deployment-version workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
40-
err := a.deploymentClient.StartWorkerDeployment(ctx, a.namespace, input.DeploymentName, identity, input.RequestId)
43+
err := a.WorkerDeploymentClient.StartWorkerDeployment(ctx, a.namespace, input.DeploymentName, identity, input.RequestId)
4144
var precond *serviceerror.FailedPrecondition
4245
if errors.As(err, &precond) {
4346
return temporal.NewNonRetryableApplicationError("failed to create deployment", errTooManyDeployments, err)
@@ -50,6 +53,16 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
5053
input *deploymentspb.SyncDeploymentVersionUserDataRequest,
5154
) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error) {
5255
logger := activity.GetLogger(ctx)
56+
scheduledTime := activity.GetInfo(ctx).ScheduledTime
57+
58+
if scheduledTime.Add(SlowPropagationDelay).Before(time.Now()) {
59+
a.Logger.Warn("Slow propagation detected, attempting to sync user data",
60+
tag.WorkflowNamespace(a.namespace.Name().String()),
61+
tag.Deployment(input.DeploymentName),
62+
tag.BuildId(input.GetVersion().GetBuildId()),
63+
)
64+
a.MetricsHandler.Counter(metrics.SlowVersioningDataPropagationCounter.Name()).Record(1)
65+
}
5366

5467
errs := make(chan error)
5568

@@ -88,7 +101,7 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
88101
req.UpsertVersionsData[input.GetVersion().GetBuildId()] = vd
89102
}
90103

91-
res, err = a.matchingClient.SyncDeploymentUserData(ctx, req)
104+
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, req)
92105

93106
if err != nil {
94107
logger.Error("syncing task queue userdata", "taskQueue", syncData.Name, "types", syncData.Types, "error", err)
@@ -115,14 +128,23 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
115128
}
116129

117130
func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error {
131+
scheduledTime := activity.GetInfo(ctx).ScheduledTime
132+
133+
if scheduledTime.Add(SlowPropagationDelay).Before(time.Now()) {
134+
a.Logger.Warn("Slow propagation detected, awaiting task queue partition propagation",
135+
tag.WorkflowNamespace(a.namespace.Name().String()),
136+
)
137+
a.MetricsHandler.Counter(metrics.SlowVersioningDataPropagationCounter.Name()).Record(1)
138+
}
139+
118140
logger := activity.GetLogger(ctx)
119141

120142
errs := make(chan error)
121143

122144
for n, v := range input.TaskQueueMaxVersions {
123145
go func(name string, version int64) {
124146
logger.Info("waiting for userdata propagation", "taskQueue", name, "version", version)
125-
_, err := a.matchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
147+
_, err := a.MatchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
126148
NamespaceId: a.namespace.ID().String(),
127149
TaskQueue: name,
128150
Version: version,
@@ -145,7 +167,7 @@ func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context
145167
func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, args *deploymentspb.CheckTaskQueuesHavePollersActivityArgs) (bool, error) {
146168
versionStr := worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromVersion(args.WorkerDeploymentVersion))
147169
for tqName, tqTypes := range args.TaskQueuesAndTypes {
148-
res, err := a.matchingClient.DescribeTaskQueue(ctx, &matchingservice.DescribeTaskQueueRequest{
170+
res, err := a.MatchingClient.DescribeTaskQueue(ctx, &matchingservice.DescribeTaskQueueRequest{
149171
NamespaceId: a.namespace.ID().String(),
150172
DescRequest: &workflowservice.DescribeTaskQueueRequest{
151173
Namespace: a.namespace.Name().String(),
@@ -176,7 +198,7 @@ func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, ar
176198

177199
func (a *VersionActivities) GetVersionDrainageStatus(ctx context.Context, version *deploymentspb.WorkerDeploymentVersion) (*deploymentpb.VersionDrainageInfo, error) {
178200
logger := activity.GetLogger(ctx)
179-
response, err := a.deploymentClient.GetVersionDrainageStatus(ctx, a.namespace, worker_versioning.WorkerDeploymentVersionToStringV31(version))
201+
response, err := a.WorkerDeploymentClient.GetVersionDrainageStatus(ctx, a.namespace, worker_versioning.WorkerDeploymentVersionToStringV31(version))
180202
if err != nil {
181203
logger.Error("error counting workflows for drainage status", "error", err)
182204
return nil, err

service/worker/workerdeployment/version_workflow.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,11 +374,17 @@ func (d *VersionWorkflowRunner) deleteVersionFromTaskQueuesAsync(ctx workflow.Co
374374
workflow.Await(ctx, func() bool { return d.asyncPropagationsInProgress == 1 }) // delete itself is counted as one
375375
d.cancelPropagations = false // need to unset this in case the version is revived
376376

377+
// Not counting the possible wait for previous propagations in this propagation latency.
378+
startTime := workflow.Now(ctx)
379+
defer func() {
380+
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(time.Since(startTime))
381+
}()
377382
d.deleteVersionFromTaskQueues(ctx, workflow.WithActivityOptions(ctx, propagationActivityOptions))
378383
d.asyncPropagationsInProgress--
379384
}
380385

381386
func (d *VersionWorkflowRunner) deleteVersionFromTaskQueues(ctx workflow.Context, activityCtx workflow.Context) error {
387+
382388
state := d.GetVersionState()
383389

384390
// sync version removal to task queues
@@ -957,6 +963,11 @@ func (d *VersionWorkflowRunner) syncTaskQueuesAsync(
957963
routingConfig *deploymentpb.RoutingConfig,
958964
newStatus enumspb.WorkerDeploymentVersionStatus,
959965
) error {
966+
startTime := workflow.Now(ctx)
967+
defer func() {
968+
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(time.Since(startTime))
969+
}()
970+
960971
state := d.GetVersionState()
961972

962973
// Build WorkerDeploymentVersionData for this version from current state

0 commit comments

Comments
 (0)