diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 3a1169dfd33..963d11ae4a0 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1281,6 +1281,8 @@ var ( WorkerDeploymentVersionVisibilityQueryCount = NewCounterDef("worker_deployment_version_visibility_query_count") WorkerDeploymentVersioningOverrideCounter = NewCounterDef("worker_deployment_versioning_override_count") StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count") + VersioningDataPropagationLatency = NewTimerDef("versioning_data_propagation_latency") + SlowVersioningDataPropagationCounter = NewCounterDef("slow_versioning_data_propagation") WorkflowResetCount = NewCounterDef("workflow_reset_count") WorkflowQuerySuccessCount = NewCounterDef("workflow_query_success_count") diff --git a/service/worker/workerdeployment/activities.go b/service/worker/workerdeployment/activities.go index 19b4cfff9d1..f48a50b3235 100644 --- a/service/worker/workerdeployment/activities.go +++ b/service/worker/workerdeployment/activities.go @@ -10,20 +10,18 @@ import ( deploymentspb "go.temporal.io/server/api/deployment/v1" "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/resource" ) type ( Activities struct { - namespace *namespace.Namespace - deploymentClient Client - matchingClient resource.MatchingClient + activityDeps + namespace *namespace.Namespace } ) func (a *Activities) SyncWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.SyncVersionStateActivityArgs) (*deploymentspb.SyncVersionStateActivityResult, error) { identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID - res, err := a.deploymentClient.SyncVersionWorkflowFromWorkerDeployment( + res, err := a.WorkerDeploymentClient.SyncVersionWorkflowFromWorkerDeployment( ctx, a.namespace, args.DeploymentName, @@ -48,7 +46,7 @@ func (a *Activities) SyncUnversionedRamp( ) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error) { logger := activity.GetLogger(ctx) // Get all the task queues in the current version and put them into SyncUserData format - currVersionInfo, _, err := a.deploymentClient.DescribeVersion(ctx, a.namespace, input.CurrentVersion, false) + currVersionInfo, _, err := a.WorkerDeploymentClient.DescribeVersion(ctx, a.namespace, input.CurrentVersion, false) if err != nil { return nil, err } @@ -76,7 +74,7 @@ func (a *Activities) SyncUnversionedRamp( logger.Info("syncing unversioned ramp to task queue userdata", "taskQueue", syncData.Name, "types", syncData.Types) var res *matchingservice.SyncDeploymentUserDataResponse var err error - res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{ + res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{ NamespaceId: a.namespace.ID().String(), TaskQueue: syncData.Name, TaskQueueTypes: syncData.Types, @@ -107,7 +105,7 @@ func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context for n, v := range input.TaskQueueMaxVersions { go func(name string, version int64) { logger.Info("waiting for unversioned ramp userdata propagation", "taskQueue", name, "version", version) - _, err := a.matchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{ + _, err := a.MatchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{ NamespaceId: a.namespace.ID().String(), TaskQueue: name, Version: version, @@ -126,7 +124,7 @@ func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context } func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deploymentspb.IsVersionMissingTaskQueuesArgs) (*deploymentspb.IsVersionMissingTaskQueuesResult, error) { - res, err := a.deploymentClient.IsVersionMissingTaskQueues( + res, err := a.WorkerDeploymentClient.IsVersionMissingTaskQueues( ctx, a.namespace, args.PrevCurrentVersion, @@ -142,7 +140,7 @@ func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deplo func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error { identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID - err := a.deploymentClient.DeleteVersionFromWorkerDeployment( + err := a.WorkerDeploymentClient.DeleteVersionFromWorkerDeployment( ctx, a.namespace, args.DeploymentName, @@ -160,7 +158,7 @@ func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *de func (a *Activities) RegisterWorkerInVersion(ctx context.Context, args *deploymentspb.RegisterWorkerInVersionArgs) error { identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID - err := a.deploymentClient.RegisterWorkerInVersion( + err := a.WorkerDeploymentClient.RegisterWorkerInVersion( ctx, a.namespace, args, @@ -173,7 +171,7 @@ func (a *Activities) RegisterWorkerInVersion(ctx context.Context, args *deployme } func (a *Activities) DescribeVersionFromWorkerDeployment(ctx context.Context, args *deploymentspb.DescribeVersionFromWorkerDeploymentActivityArgs) (*deploymentspb.DescribeVersionFromWorkerDeploymentActivityResult, error) { - res, _, err := a.deploymentClient.DescribeVersion(ctx, a.namespace, args.Version, false) + res, _, err := a.WorkerDeploymentClient.DescribeVersion(ctx, a.namespace, args.Version, false) if err != nil { return nil, err } @@ -201,7 +199,7 @@ func (a *Activities) SyncDeploymentVersionUserDataFromWorkerDeployment( var err error if input.ForgetVersion { - res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{ + res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{ NamespaceId: a.namespace.ID().String(), DeploymentName: input.GetDeploymentName(), TaskQueue: syncData.Name, @@ -211,7 +209,7 @@ func (a *Activities) SyncDeploymentVersionUserDataFromWorkerDeployment( }, }) } else { - res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{ + res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{ NamespaceId: a.namespace.ID().String(), DeploymentName: input.GetDeploymentName(), TaskQueue: syncData.Name, @@ -250,5 +248,5 @@ func (a *Activities) StartWorkerDeploymentVersionWorkflow( logger := activity.GetLogger(ctx) logger.Info("starting worker deployment version workflow", "deploymentName", input.DeploymentName, "buildID", input.BuildId) identity := "deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID - return a.deploymentClient.StartWorkerDeploymentVersion(ctx, a.namespace, input.DeploymentName, input.BuildId, identity, input.RequestId) + return a.WorkerDeploymentClient.StartWorkerDeploymentVersion(ctx, a.namespace, input.DeploymentName, input.BuildId, identity, input.RequestId) } diff --git a/service/worker/workerdeployment/fx.go b/service/worker/workerdeployment/fx.go index d44f0bceaca..5095371a886 100644 --- a/service/worker/workerdeployment/fx.go +++ b/service/worker/workerdeployment/fx.go @@ -119,16 +119,14 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na registry.RegisterWorkflowWithOptions(deploymentWorkflow, workflow.RegisterOptions{Name: WorkerDeploymentWorkflowType}) versionActivities := &VersionActivities{ - namespace: ns, - deploymentClient: s.activityDeps.WorkerDeploymentClient, - matchingClient: s.activityDeps.MatchingClient, + activityDeps: s.activityDeps, + namespace: ns, } registry.RegisterActivity(versionActivities) activities := &Activities{ - namespace: ns, - deploymentClient: s.activityDeps.WorkerDeploymentClient, - matchingClient: s.activityDeps.MatchingClient, + activityDeps: s.activityDeps, + namespace: ns, } registry.RegisterActivity(activities) return nil diff --git a/service/worker/workerdeployment/version_activities.go b/service/worker/workerdeployment/version_activities.go index 275ec79bab5..af44ec547e1 100644 --- a/service/worker/workerdeployment/version_activities.go +++ b/service/worker/workerdeployment/version_activities.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sync" + "time" deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" @@ -13,20 +14,22 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" deploymentspb "go.temporal.io/server/api/deployment/v1" "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/resource" "go.temporal.io/server/common/worker_versioning" "google.golang.org/protobuf/types/known/timestamppb" ) +const SlowPropagationDelay = 10 * time.Second + type ( VersionActivities struct { - namespace *namespace.Namespace - deploymentClient Client - matchingClient resource.MatchingClient + activityDeps + namespace *namespace.Namespace } ) @@ -37,7 +40,7 @@ func (a *VersionActivities) StartWorkerDeploymentWorkflow( logger := activity.GetLogger(ctx) logger.Info("starting worker-deployment workflow", "deploymentName", input.DeploymentName) identity := "deployment-version workflow " + activity.GetInfo(ctx).WorkflowExecution.ID - err := a.deploymentClient.StartWorkerDeployment(ctx, a.namespace, input.DeploymentName, identity, input.RequestId) + err := a.WorkerDeploymentClient.StartWorkerDeployment(ctx, a.namespace, input.DeploymentName, identity, input.RequestId) var precond *serviceerror.FailedPrecondition if errors.As(err, &precond) { return temporal.NewNonRetryableApplicationError("failed to create deployment", errTooManyDeployments, err) @@ -50,6 +53,7 @@ func (a *VersionActivities) SyncDeploymentVersionUserData( input *deploymentspb.SyncDeploymentVersionUserDataRequest, ) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error) { logger := activity.GetLogger(ctx) + defer a.checkSlowPropagation(ctx, logger) errs := make(chan error) @@ -88,7 +92,7 @@ func (a *VersionActivities) SyncDeploymentVersionUserData( req.UpsertVersionsData[input.GetVersion().GetBuildId()] = vd } - res, err = a.matchingClient.SyncDeploymentUserData(ctx, req) + res, err = a.MatchingClient.SyncDeploymentUserData(ctx, req) if err != nil { logger.Error("syncing task queue userdata", "taskQueue", syncData.Name, "types", syncData.Types, "error", err) @@ -114,15 +118,24 @@ func (a *VersionActivities) SyncDeploymentVersionUserData( return &deploymentspb.SyncDeploymentVersionUserDataResponse{TaskQueueMaxVersions: maxVersionByName}, nil } +func (a *VersionActivities) checkSlowPropagation(ctx context.Context, logger log.Logger) { + firstAttemptScheduledTime := activity.GetInfo(ctx).ScheduledTime + if firstAttemptScheduledTime.Add(SlowPropagationDelay).Before(time.Now()) { + logger.Warn("Slow propagation detected", "duration", time.Since(firstAttemptScheduledTime)) + a.MetricsHandler.Counter(metrics.SlowVersioningDataPropagationCounter.Name()).Record(1) + } +} + func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error { logger := activity.GetLogger(ctx) + defer a.checkSlowPropagation(ctx, logger) errs := make(chan error) for n, v := range input.TaskQueueMaxVersions { go func(name string, version int64) { logger.Info("waiting for userdata propagation", "taskQueue", name, "version", version) - _, err := a.matchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{ + _, err := a.MatchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{ NamespaceId: a.namespace.ID().String(), TaskQueue: name, Version: version, @@ -145,7 +158,7 @@ func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, args *deploymentspb.CheckTaskQueuesHavePollersActivityArgs) (bool, error) { versionStr := worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromVersion(args.WorkerDeploymentVersion)) for tqName, tqTypes := range args.TaskQueuesAndTypes { - res, err := a.matchingClient.DescribeTaskQueue(ctx, &matchingservice.DescribeTaskQueueRequest{ + res, err := a.MatchingClient.DescribeTaskQueue(ctx, &matchingservice.DescribeTaskQueueRequest{ NamespaceId: a.namespace.ID().String(), DescRequest: &workflowservice.DescribeTaskQueueRequest{ Namespace: a.namespace.Name().String(), @@ -176,7 +189,7 @@ func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, ar func (a *VersionActivities) GetVersionDrainageStatus(ctx context.Context, version *deploymentspb.WorkerDeploymentVersion) (*deploymentpb.VersionDrainageInfo, error) { logger := activity.GetLogger(ctx) - response, err := a.deploymentClient.GetVersionDrainageStatus(ctx, a.namespace, worker_versioning.WorkerDeploymentVersionToStringV31(version)) + response, err := a.WorkerDeploymentClient.GetVersionDrainageStatus(ctx, a.namespace, worker_versioning.WorkerDeploymentVersionToStringV31(version)) if err != nil { logger.Error("error counting workflows for drainage status", "error", err) return nil, err diff --git a/service/worker/workerdeployment/version_workflow.go b/service/worker/workerdeployment/version_workflow.go index 263d3beb092..8b39e912bea 100644 --- a/service/worker/workerdeployment/version_workflow.go +++ b/service/worker/workerdeployment/version_workflow.go @@ -374,11 +374,17 @@ func (d *VersionWorkflowRunner) deleteVersionFromTaskQueuesAsync(ctx workflow.Co workflow.Await(ctx, func() bool { return d.asyncPropagationsInProgress == 1 }) // delete itself is counted as one d.cancelPropagations = false // need to unset this in case the version is revived + // Not counting the possible wait for previous propagations in this propagation latency. + startTime := workflow.Now(ctx) + defer func() { + d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(workflow.Now(ctx).Sub(startTime)) + }() d.deleteVersionFromTaskQueues(ctx, workflow.WithActivityOptions(ctx, propagationActivityOptions)) d.asyncPropagationsInProgress-- } func (d *VersionWorkflowRunner) deleteVersionFromTaskQueues(ctx workflow.Context, activityCtx workflow.Context) error { + state := d.GetVersionState() // sync version removal to task queues @@ -957,6 +963,11 @@ func (d *VersionWorkflowRunner) syncTaskQueuesAsync( routingConfig *deploymentpb.RoutingConfig, newStatus enumspb.WorkerDeploymentVersionStatus, ) error { + startTime := workflow.Now(ctx) + defer func() { + d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(workflow.Now(ctx).Sub(startTime)) + }() + state := d.GetVersionState() // Build WorkerDeploymentVersionData for this version from current state @@ -1044,7 +1055,12 @@ func (d *VersionWorkflowRunner) executeAndTrackAsyncPropagation( } if routingConfig != nil { - d.syncSummary(ctx) + if workflow.GetVersion(ctx, "no-propagation-sync-summary", workflow.DefaultVersion, 0) == workflow.DefaultVersion { + // TODO: clean this unnecessary sync. + // No summary changes need to happen after async propagation because the deployment + // workflow has already got the latest summary from update response. + d.syncSummary(ctx) + } // Signal deployment workflow that routing config propagation completed d.signalPropagationComplete(ctx, routingConfig.GetRevisionNumber()) }