Skip to content

Commit 2ab8987

Browse files
committed
address comments and lints
1 parent cd386e2 commit 2ab8987

File tree

2 files changed

+14
-22
lines changed

2 files changed

+14
-22
lines changed

service/worker/workerdeployment/version_activities.go

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ import (
1414
taskqueuepb "go.temporal.io/api/taskqueue/v1"
1515
"go.temporal.io/api/workflowservice/v1"
1616
"go.temporal.io/sdk/activity"
17+
"go.temporal.io/sdk/log"
1718
"go.temporal.io/sdk/temporal"
1819
deploymentspb "go.temporal.io/server/api/deployment/v1"
1920
"go.temporal.io/server/api/matchingservice/v1"
20-
"go.temporal.io/server/common/log/tag"
2121
"go.temporal.io/server/common/metrics"
2222
"go.temporal.io/server/common/namespace"
2323
"go.temporal.io/server/common/worker_versioning"
@@ -53,16 +53,7 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
5353
input *deploymentspb.SyncDeploymentVersionUserDataRequest,
5454
) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error) {
5555
logger := activity.GetLogger(ctx)
56-
scheduledTime := activity.GetInfo(ctx).ScheduledTime
57-
58-
if scheduledTime.Add(SlowPropagationDelay).Before(time.Now()) {
59-
a.Logger.Warn("Slow propagation detected, attempting to sync user data",
60-
tag.WorkflowNamespace(a.namespace.Name().String()),
61-
tag.Deployment(input.DeploymentName),
62-
tag.BuildId(input.GetVersion().GetBuildId()),
63-
)
64-
a.MetricsHandler.Counter(metrics.SlowVersioningDataPropagationCounter.Name()).Record(1)
65-
}
56+
defer a.checkSlowPropagation(ctx, logger)
6657

6758
errs := make(chan error)
6859

@@ -127,17 +118,17 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
127118
return &deploymentspb.SyncDeploymentVersionUserDataResponse{TaskQueueMaxVersions: maxVersionByName}, nil
128119
}
129120

130-
func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error {
131-
scheduledTime := activity.GetInfo(ctx).ScheduledTime
132-
133-
if scheduledTime.Add(SlowPropagationDelay).Before(time.Now()) {
134-
a.Logger.Warn("Slow propagation detected, awaiting task queue partition propagation",
135-
tag.WorkflowNamespace(a.namespace.Name().String()),
136-
)
121+
func (a *VersionActivities) checkSlowPropagation(ctx context.Context, logger log.Logger) {
122+
firstAttemptScheduledTime := activity.GetInfo(ctx).ScheduledTime
123+
if firstAttemptScheduledTime.Add(SlowPropagationDelay).Before(time.Now()) {
124+
logger.Warn("Slow propagation detected", "duration", time.Since(firstAttemptScheduledTime))
137125
a.MetricsHandler.Counter(metrics.SlowVersioningDataPropagationCounter.Name()).Record(1)
138126
}
127+
}
139128

129+
func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error {
140130
logger := activity.GetLogger(ctx)
131+
defer a.checkSlowPropagation(ctx, logger)
141132

142133
errs := make(chan error)
143134

service/worker/workerdeployment/version_workflow.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ func (d *VersionWorkflowRunner) deleteVersionFromTaskQueuesAsync(ctx workflow.Co
377377
// Not counting the possible wait for previous propagations in this propagation latency.
378378
startTime := workflow.Now(ctx)
379379
defer func() {
380-
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(time.Since(startTime))
380+
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(workflow.Now(ctx).Sub(startTime))
381381
}()
382382
d.deleteVersionFromTaskQueues(ctx, workflow.WithActivityOptions(ctx, propagationActivityOptions))
383383
d.asyncPropagationsInProgress--
@@ -965,7 +965,7 @@ func (d *VersionWorkflowRunner) syncTaskQueuesAsync(
965965
) error {
966966
startTime := workflow.Now(ctx)
967967
defer func() {
968-
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(time.Since(startTime))
968+
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(workflow.Now(ctx).Sub(startTime))
969969
}()
970970

971971
state := d.GetVersionState()
@@ -1056,8 +1056,9 @@ func (d *VersionWorkflowRunner) executeAndTrackAsyncPropagation(
10561056

10571057
if routingConfig != nil {
10581058
if workflow.GetVersion(ctx, "no-propagation-sync-summary", workflow.DefaultVersion, 0) == workflow.DefaultVersion {
1059-
// TODO: clean this unnecessary sync up.
1060-
// No summary changes happen in async propagation that the deployment workflow
1059+
// TODO: clean this unnecessary sync.
1060+
// No summary changes need to happen after async propagation because the deployment
1061+
// workflow has already got the latest summary from update response.
10611062
d.syncSummary(ctx)
10621063
}
10631064
// Signal deployment workflow that routing config propagation completed

0 commit comments

Comments
 (0)