Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,8 @@ var (
WorkerDeploymentVersionVisibilityQueryCount = NewCounterDef("worker_deployment_version_visibility_query_count")
WorkerDeploymentVersioningOverrideCounter = NewCounterDef("worker_deployment_versioning_override_count")
StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count")
VersioningDataPropagationLatency = NewTimerDef("versioning_data_propagation_latency")
SlowVersioningDataPropagationCounter = NewCounterDef("slow_versioning_data_propagation")
Comment on lines +1284 to +1285
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: do we wanna also have a counter to track the data propagation for those users using sync workflows?

The reason why I say this is because they could make things easier to debug/understand when they have an operator's hat on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you agree to the above point I made, we need to rename the variables just fyi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we're not gonna support sync wfs for long. once we verify async then we'd make that the default and eventually cleanup async path.


WorkflowResetCount = NewCounterDef("workflow_reset_count")
WorkflowQuerySuccessCount = NewCounterDef("workflow_query_success_count")
Expand Down
28 changes: 13 additions & 15 deletions service/worker/workerdeployment/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,18 @@ import (
deploymentspb "go.temporal.io/server/api/deployment/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/resource"
)

type (
Activities struct {
namespace *namespace.Namespace
deploymentClient Client
matchingClient resource.MatchingClient
activityDeps
namespace *namespace.Namespace
}
)

func (a *Activities) SyncWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.SyncVersionStateActivityArgs) (*deploymentspb.SyncVersionStateActivityResult, error) {
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
res, err := a.deploymentClient.SyncVersionWorkflowFromWorkerDeployment(
res, err := a.WorkerDeploymentClient.SyncVersionWorkflowFromWorkerDeployment(
ctx,
a.namespace,
args.DeploymentName,
Expand All @@ -48,7 +46,7 @@ func (a *Activities) SyncUnversionedRamp(
) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error) {
logger := activity.GetLogger(ctx)
// Get all the task queues in the current version and put them into SyncUserData format
currVersionInfo, _, err := a.deploymentClient.DescribeVersion(ctx, a.namespace, input.CurrentVersion, false)
currVersionInfo, _, err := a.WorkerDeploymentClient.DescribeVersion(ctx, a.namespace, input.CurrentVersion, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is changing from deploymentClient to WorkerDeploymentClient a design change only? I looked around and I think we were repeating the declaration of this earlier on but I just wanna be sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the name that was there in activityDeps. now that we embed that we don't need to have deploymentClient.

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -76,7 +74,7 @@ func (a *Activities) SyncUnversionedRamp(
logger.Info("syncing unversioned ramp to task queue userdata", "taskQueue", syncData.Name, "types", syncData.Types)
var res *matchingservice.SyncDeploymentUserDataResponse
var err error
res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
NamespaceId: a.namespace.ID().String(),
TaskQueue: syncData.Name,
TaskQueueTypes: syncData.Types,
Expand Down Expand Up @@ -107,7 +105,7 @@ func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context
for n, v := range input.TaskQueueMaxVersions {
go func(name string, version int64) {
logger.Info("waiting for unversioned ramp userdata propagation", "taskQueue", name, "version", version)
_, err := a.matchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
_, err := a.MatchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
NamespaceId: a.namespace.ID().String(),
TaskQueue: name,
Version: version,
Expand All @@ -126,7 +124,7 @@ func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context
}

func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deploymentspb.IsVersionMissingTaskQueuesArgs) (*deploymentspb.IsVersionMissingTaskQueuesResult, error) {
res, err := a.deploymentClient.IsVersionMissingTaskQueues(
res, err := a.WorkerDeploymentClient.IsVersionMissingTaskQueues(
ctx,
a.namespace,
args.PrevCurrentVersion,
Expand All @@ -142,7 +140,7 @@ func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deplo

func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error {
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
err := a.deploymentClient.DeleteVersionFromWorkerDeployment(
err := a.WorkerDeploymentClient.DeleteVersionFromWorkerDeployment(
ctx,
a.namespace,
args.DeploymentName,
Expand All @@ -160,7 +158,7 @@ func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *de

func (a *Activities) RegisterWorkerInVersion(ctx context.Context, args *deploymentspb.RegisterWorkerInVersionArgs) error {
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
err := a.deploymentClient.RegisterWorkerInVersion(
err := a.WorkerDeploymentClient.RegisterWorkerInVersion(
ctx,
a.namespace,
args,
Expand All @@ -173,7 +171,7 @@ func (a *Activities) RegisterWorkerInVersion(ctx context.Context, args *deployme
}

func (a *Activities) DescribeVersionFromWorkerDeployment(ctx context.Context, args *deploymentspb.DescribeVersionFromWorkerDeploymentActivityArgs) (*deploymentspb.DescribeVersionFromWorkerDeploymentActivityResult, error) {
res, _, err := a.deploymentClient.DescribeVersion(ctx, a.namespace, args.Version, false)
res, _, err := a.WorkerDeploymentClient.DescribeVersion(ctx, a.namespace, args.Version, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,7 +199,7 @@ func (a *Activities) SyncDeploymentVersionUserDataFromWorkerDeployment(
var err error

if input.ForgetVersion {
res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
NamespaceId: a.namespace.ID().String(),
DeploymentName: input.GetDeploymentName(),
TaskQueue: syncData.Name,
Expand All @@ -211,7 +209,7 @@ func (a *Activities) SyncDeploymentVersionUserDataFromWorkerDeployment(
},
})
} else {
res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
NamespaceId: a.namespace.ID().String(),
DeploymentName: input.GetDeploymentName(),
TaskQueue: syncData.Name,
Expand Down Expand Up @@ -250,5 +248,5 @@ func (a *Activities) StartWorkerDeploymentVersionWorkflow(
logger := activity.GetLogger(ctx)
logger.Info("starting worker deployment version workflow", "deploymentName", input.DeploymentName, "buildID", input.BuildId)
identity := "deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
return a.deploymentClient.StartWorkerDeploymentVersion(ctx, a.namespace, input.DeploymentName, input.BuildId, identity, input.RequestId)
return a.WorkerDeploymentClient.StartWorkerDeploymentVersion(ctx, a.namespace, input.DeploymentName, input.BuildId, identity, input.RequestId)
}
10 changes: 4 additions & 6 deletions service/worker/workerdeployment/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,14 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na
registry.RegisterWorkflowWithOptions(deploymentWorkflow, workflow.RegisterOptions{Name: WorkerDeploymentWorkflowType})

versionActivities := &VersionActivities{
namespace: ns,
deploymentClient: s.activityDeps.WorkerDeploymentClient,
matchingClient: s.activityDeps.MatchingClient,
activityDeps: s.activityDeps,
namespace: ns,
}
registry.RegisterActivity(versionActivities)

activities := &Activities{
namespace: ns,
deploymentClient: s.activityDeps.WorkerDeploymentClient,
matchingClient: s.activityDeps.MatchingClient,
activityDeps: s.activityDeps,
namespace: ns,
}
registry.RegisterActivity(activities)
return nil
Expand Down
31 changes: 22 additions & 9 deletions service/worker/workerdeployment/version_activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,30 @@ import (
"errors"
"fmt"
"sync"
"time"

deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
deploymentspb "go.temporal.io/server/api/deployment/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/worker_versioning"
"google.golang.org/protobuf/types/known/timestamppb"
)

const SlowPropagationDelay = 10 * time.Second

type (
VersionActivities struct {
namespace *namespace.Namespace
deploymentClient Client
matchingClient resource.MatchingClient
activityDeps
namespace *namespace.Namespace
}
)

Expand All @@ -37,7 +40,7 @@ func (a *VersionActivities) StartWorkerDeploymentWorkflow(
logger := activity.GetLogger(ctx)
logger.Info("starting worker-deployment workflow", "deploymentName", input.DeploymentName)
identity := "deployment-version workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
err := a.deploymentClient.StartWorkerDeployment(ctx, a.namespace, input.DeploymentName, identity, input.RequestId)
err := a.WorkerDeploymentClient.StartWorkerDeployment(ctx, a.namespace, input.DeploymentName, identity, input.RequestId)
var precond *serviceerror.FailedPrecondition
if errors.As(err, &precond) {
return temporal.NewNonRetryableApplicationError("failed to create deployment", errTooManyDeployments, err)
Expand All @@ -50,6 +53,7 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
input *deploymentspb.SyncDeploymentVersionUserDataRequest,
) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error) {
logger := activity.GetLogger(ctx)
defer a.checkSlowPropagation(ctx, logger)

errs := make(chan error)

Expand Down Expand Up @@ -88,7 +92,7 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
req.UpsertVersionsData[input.GetVersion().GetBuildId()] = vd
}

res, err = a.matchingClient.SyncDeploymentUserData(ctx, req)
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, req)

if err != nil {
logger.Error("syncing task queue userdata", "taskQueue", syncData.Name, "types", syncData.Types, "error", err)
Expand All @@ -114,15 +118,24 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
return &deploymentspb.SyncDeploymentVersionUserDataResponse{TaskQueueMaxVersions: maxVersionByName}, nil
}

func (a *VersionActivities) checkSlowPropagation(ctx context.Context, logger log.Logger) {
firstAttemptScheduledTime := activity.GetInfo(ctx).ScheduledTime
if firstAttemptScheduledTime.Add(SlowPropagationDelay).Before(time.Now()) {
logger.Warn("Slow propagation detected", "duration", time.Since(firstAttemptScheduledTime))
a.MetricsHandler.Counter(metrics.SlowVersioningDataPropagationCounter.Name()).Record(1)
}
}

func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error {
logger := activity.GetLogger(ctx)
defer a.checkSlowPropagation(ctx, logger)

errs := make(chan error)

for n, v := range input.TaskQueueMaxVersions {
go func(name string, version int64) {
logger.Info("waiting for userdata propagation", "taskQueue", name, "version", version)
_, err := a.matchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
_, err := a.MatchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
NamespaceId: a.namespace.ID().String(),
TaskQueue: name,
Version: version,
Expand All @@ -145,7 +158,7 @@ func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context
func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, args *deploymentspb.CheckTaskQueuesHavePollersActivityArgs) (bool, error) {
versionStr := worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromVersion(args.WorkerDeploymentVersion))
for tqName, tqTypes := range args.TaskQueuesAndTypes {
res, err := a.matchingClient.DescribeTaskQueue(ctx, &matchingservice.DescribeTaskQueueRequest{
res, err := a.MatchingClient.DescribeTaskQueue(ctx, &matchingservice.DescribeTaskQueueRequest{
NamespaceId: a.namespace.ID().String(),
DescRequest: &workflowservice.DescribeTaskQueueRequest{
Namespace: a.namespace.Name().String(),
Expand Down Expand Up @@ -176,7 +189,7 @@ func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, ar

func (a *VersionActivities) GetVersionDrainageStatus(ctx context.Context, version *deploymentspb.WorkerDeploymentVersion) (*deploymentpb.VersionDrainageInfo, error) {
logger := activity.GetLogger(ctx)
response, err := a.deploymentClient.GetVersionDrainageStatus(ctx, a.namespace, worker_versioning.WorkerDeploymentVersionToStringV31(version))
response, err := a.WorkerDeploymentClient.GetVersionDrainageStatus(ctx, a.namespace, worker_versioning.WorkerDeploymentVersionToStringV31(version))
if err != nil {
logger.Error("error counting workflows for drainage status", "error", err)
return nil, err
Expand Down
18 changes: 17 additions & 1 deletion service/worker/workerdeployment/version_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,17 @@ func (d *VersionWorkflowRunner) deleteVersionFromTaskQueuesAsync(ctx workflow.Co
workflow.Await(ctx, func() bool { return d.asyncPropagationsInProgress == 1 }) // delete itself is counted as one
d.cancelPropagations = false // need to unset this in case the version is revived

// Not counting the possible wait for previous propagations in this propagation latency.
startTime := workflow.Now(ctx)
defer func() {
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(workflow.Now(ctx).Sub(startTime))
}()
d.deleteVersionFromTaskQueues(ctx, workflow.WithActivityOptions(ctx, propagationActivityOptions))
d.asyncPropagationsInProgress--
}

func (d *VersionWorkflowRunner) deleteVersionFromTaskQueues(ctx workflow.Context, activityCtx workflow.Context) error {

state := d.GetVersionState()

// sync version removal to task queues
Expand Down Expand Up @@ -957,6 +963,11 @@ func (d *VersionWorkflowRunner) syncTaskQueuesAsync(
routingConfig *deploymentpb.RoutingConfig,
newStatus enumspb.WorkerDeploymentVersionStatus,
) error {
startTime := workflow.Now(ctx)
defer func() {
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(workflow.Now(ctx).Sub(startTime))
}()

state := d.GetVersionState()

// Build WorkerDeploymentVersionData for this version from current state
Expand Down Expand Up @@ -1044,7 +1055,12 @@ func (d *VersionWorkflowRunner) executeAndTrackAsyncPropagation(
}

if routingConfig != nil {
d.syncSummary(ctx)
if workflow.GetVersion(ctx, "no-propagation-sync-summary", workflow.DefaultVersion, 0) == workflow.DefaultVersion {
// TODO: clean this unnecessary sync.
// No summary changes need to happen after async propagation because the deployment
// workflow has already got the latest summary from update response.
d.syncSummary(ctx)
}
// Signal deployment workflow that routing config propagation completed
d.signalPropagationComplete(ctx, routingConfig.GetRevisionNumber())
}
Expand Down
Loading