Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,8 @@ var (
WorkerDeploymentVersionVisibilityQueryCount = NewCounterDef("worker_deployment_version_visibility_query_count")
WorkerDeploymentVersioningOverrideCounter = NewCounterDef("worker_deployment_versioning_override_count")
StartDeploymentTransitionCounter = NewCounterDef("start_deployment_transition_count")
VersioningDataPropagationLatency = NewTimerDef("versioning_data_propagation_latency")
SlowVersioningDataPropagationCounter = NewCounterDef("slow_versioning_data_propagation")
Comment on lines +1284 to +1285
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: do we wanna also have a counter to track the data propagation for those users using sync workflows?

The reason why I say this is because they could make things easier to debug/understand when they have an operator's hat on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you agree to the above point I made, we need to rename the variables just fyi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we're not gonna support sync wfs for long. once we verify async then we'd make that the default and eventually cleanup async path.


WorkflowResetCount = NewCounterDef("workflow_reset_count")
WorkflowQuerySuccessCount = NewCounterDef("workflow_query_success_count")
Expand Down
28 changes: 13 additions & 15 deletions service/worker/workerdeployment/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,18 @@ import (
deploymentspb "go.temporal.io/server/api/deployment/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/resource"
)

type (
Activities struct {
namespace *namespace.Namespace
deploymentClient Client
matchingClient resource.MatchingClient
activityDeps
namespace *namespace.Namespace
}
)

func (a *Activities) SyncWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.SyncVersionStateActivityArgs) (*deploymentspb.SyncVersionStateActivityResult, error) {
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
res, err := a.deploymentClient.SyncVersionWorkflowFromWorkerDeployment(
res, err := a.WorkerDeploymentClient.SyncVersionWorkflowFromWorkerDeployment(
ctx,
a.namespace,
args.DeploymentName,
Expand All @@ -48,7 +46,7 @@ func (a *Activities) SyncUnversionedRamp(
) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error) {
logger := activity.GetLogger(ctx)
// Get all the task queues in the current version and put them into SyncUserData format
currVersionInfo, _, err := a.deploymentClient.DescribeVersion(ctx, a.namespace, input.CurrentVersion, false)
currVersionInfo, _, err := a.WorkerDeploymentClient.DescribeVersion(ctx, a.namespace, input.CurrentVersion, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is changing from deploymentClient to WorkerDeploymentClient a design change only? I looked around and I think we were repeating the declaration of this earlier on but I just wanna be sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the name that was there in activityDeps. now that we embed that we don't need to have deploymentClient.

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -76,7 +74,7 @@ func (a *Activities) SyncUnversionedRamp(
logger.Info("syncing unversioned ramp to task queue userdata", "taskQueue", syncData.Name, "types", syncData.Types)
var res *matchingservice.SyncDeploymentUserDataResponse
var err error
res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
NamespaceId: a.namespace.ID().String(),
TaskQueue: syncData.Name,
TaskQueueTypes: syncData.Types,
Expand Down Expand Up @@ -107,7 +105,7 @@ func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context
for n, v := range input.TaskQueueMaxVersions {
go func(name string, version int64) {
logger.Info("waiting for unversioned ramp userdata propagation", "taskQueue", name, "version", version)
_, err := a.matchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
_, err := a.MatchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
NamespaceId: a.namespace.ID().String(),
TaskQueue: name,
Version: version,
Expand All @@ -126,7 +124,7 @@ func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context
}

func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deploymentspb.IsVersionMissingTaskQueuesArgs) (*deploymentspb.IsVersionMissingTaskQueuesResult, error) {
res, err := a.deploymentClient.IsVersionMissingTaskQueues(
res, err := a.WorkerDeploymentClient.IsVersionMissingTaskQueues(
ctx,
a.namespace,
args.PrevCurrentVersion,
Expand All @@ -142,7 +140,7 @@ func (a *Activities) IsVersionMissingTaskQueues(ctx context.Context, args *deplo

func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error {
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
err := a.deploymentClient.DeleteVersionFromWorkerDeployment(
err := a.WorkerDeploymentClient.DeleteVersionFromWorkerDeployment(
ctx,
a.namespace,
args.DeploymentName,
Expand All @@ -160,7 +158,7 @@ func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *de

func (a *Activities) RegisterWorkerInVersion(ctx context.Context, args *deploymentspb.RegisterWorkerInVersionArgs) error {
identity := "worker-deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
err := a.deploymentClient.RegisterWorkerInVersion(
err := a.WorkerDeploymentClient.RegisterWorkerInVersion(
ctx,
a.namespace,
args,
Expand All @@ -173,7 +171,7 @@ func (a *Activities) RegisterWorkerInVersion(ctx context.Context, args *deployme
}

func (a *Activities) DescribeVersionFromWorkerDeployment(ctx context.Context, args *deploymentspb.DescribeVersionFromWorkerDeploymentActivityArgs) (*deploymentspb.DescribeVersionFromWorkerDeploymentActivityResult, error) {
res, _, err := a.deploymentClient.DescribeVersion(ctx, a.namespace, args.Version, false)
res, _, err := a.WorkerDeploymentClient.DescribeVersion(ctx, a.namespace, args.Version, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,7 +199,7 @@ func (a *Activities) SyncDeploymentVersionUserDataFromWorkerDeployment(
var err error

if input.ForgetVersion {
res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
NamespaceId: a.namespace.ID().String(),
DeploymentName: input.GetDeploymentName(),
TaskQueue: syncData.Name,
Expand All @@ -211,7 +209,7 @@ func (a *Activities) SyncDeploymentVersionUserDataFromWorkerDeployment(
},
})
} else {
res, err = a.matchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, &matchingservice.SyncDeploymentUserDataRequest{
NamespaceId: a.namespace.ID().String(),
DeploymentName: input.GetDeploymentName(),
TaskQueue: syncData.Name,
Expand Down Expand Up @@ -250,5 +248,5 @@ func (a *Activities) StartWorkerDeploymentVersionWorkflow(
logger := activity.GetLogger(ctx)
logger.Info("starting worker deployment version workflow", "deploymentName", input.DeploymentName, "buildID", input.BuildId)
identity := "deployment workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
return a.deploymentClient.StartWorkerDeploymentVersion(ctx, a.namespace, input.DeploymentName, input.BuildId, identity, input.RequestId)
return a.WorkerDeploymentClient.StartWorkerDeploymentVersion(ctx, a.namespace, input.DeploymentName, input.BuildId, identity, input.RequestId)
}
10 changes: 4 additions & 6 deletions service/worker/workerdeployment/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,14 @@ func (s *workerComponent) Register(registry sdkworker.Registry, ns *namespace.Na
registry.RegisterWorkflowWithOptions(deploymentWorkflow, workflow.RegisterOptions{Name: WorkerDeploymentWorkflowType})

versionActivities := &VersionActivities{
namespace: ns,
deploymentClient: s.activityDeps.WorkerDeploymentClient,
matchingClient: s.activityDeps.MatchingClient,
activityDeps: s.activityDeps,
namespace: ns,
}
registry.RegisterActivity(versionActivities)

activities := &Activities{
namespace: ns,
deploymentClient: s.activityDeps.WorkerDeploymentClient,
matchingClient: s.activityDeps.MatchingClient,
activityDeps: s.activityDeps,
namespace: ns,
}
registry.RegisterActivity(activities)
return nil
Expand Down
40 changes: 31 additions & 9 deletions service/worker/workerdeployment/version_activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"sync"
"time"

deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand All @@ -16,17 +17,19 @@ import (
"go.temporal.io/sdk/temporal"
deploymentspb "go.temporal.io/server/api/deployment/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/worker_versioning"
"google.golang.org/protobuf/types/known/timestamppb"
)

const SlowPropagationDelay = 10 * time.Second

type (
VersionActivities struct {
namespace *namespace.Namespace
deploymentClient Client
matchingClient resource.MatchingClient
activityDeps
namespace *namespace.Namespace
}
)

Expand All @@ -37,7 +40,7 @@ func (a *VersionActivities) StartWorkerDeploymentWorkflow(
logger := activity.GetLogger(ctx)
logger.Info("starting worker-deployment workflow", "deploymentName", input.DeploymentName)
identity := "deployment-version workflow " + activity.GetInfo(ctx).WorkflowExecution.ID
err := a.deploymentClient.StartWorkerDeployment(ctx, a.namespace, input.DeploymentName, identity, input.RequestId)
err := a.WorkerDeploymentClient.StartWorkerDeployment(ctx, a.namespace, input.DeploymentName, identity, input.RequestId)
var precond *serviceerror.FailedPrecondition
if errors.As(err, &precond) {
return temporal.NewNonRetryableApplicationError("failed to create deployment", errTooManyDeployments, err)
Expand All @@ -50,6 +53,16 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
input *deploymentspb.SyncDeploymentVersionUserDataRequest,
) (*deploymentspb.SyncDeploymentVersionUserDataResponse, error) {
logger := activity.GetLogger(ctx)
scheduledTime := activity.GetInfo(ctx).ScheduledTime

if scheduledTime.Add(SlowPropagationDelay).Before(time.Now()) {
a.Logger.Warn("Slow propagation detected, attempting to sync user data",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be using logger, which is defined above, here? Wonder if these two are different fundamentally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logger in SDK adds activity context stuff like wf and activity names which is a good thing, changing to that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see what you were trying to find here, but I took a minute/two to understand that this was actually finding out how long it took for the already scheduled activity to start.

Not sure if the name of SlowVersioningDataPropagationCounter makes sense if my above understanding is right. Something like ActivitySchedulingDelayCounter could make sense but shall leave the naming up to ya!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this scheduledTime is the first attempt schedule time, not the current attempt scheduling time. so basically if the activity has to retry for long it should fire.

tag.WorkflowNamespace(a.namespace.Name().String()),
tag.Deployment(input.DeploymentName),
tag.BuildId(input.GetVersion().GetBuildId()),
)
a.MetricsHandler.Counter(metrics.SlowVersioningDataPropagationCounter.Name()).Record(1)
}

errs := make(chan error)

Expand Down Expand Up @@ -88,7 +101,7 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
req.UpsertVersionsData[input.GetVersion().GetBuildId()] = vd
}

res, err = a.matchingClient.SyncDeploymentUserData(ctx, req)
res, err = a.MatchingClient.SyncDeploymentUserData(ctx, req)

if err != nil {
logger.Error("syncing task queue userdata", "taskQueue", syncData.Name, "types", syncData.Types, "error", err)
Expand All @@ -115,14 +128,23 @@ func (a *VersionActivities) SyncDeploymentVersionUserData(
}

func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error {
scheduledTime := activity.GetInfo(ctx).ScheduledTime

if scheduledTime.Add(SlowPropagationDelay).Before(time.Now()) {
a.Logger.Warn("Slow propagation detected, awaiting task queue partition propagation",
tag.WorkflowNamespace(a.namespace.Name().String()),
)
a.MetricsHandler.Counter(metrics.SlowVersioningDataPropagationCounter.Name()).Record(1)
}

logger := activity.GetLogger(ctx)

errs := make(chan error)

for n, v := range input.TaskQueueMaxVersions {
go func(name string, version int64) {
logger.Info("waiting for userdata propagation", "taskQueue", name, "version", version)
_, err := a.matchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
_, err := a.MatchingClient.CheckTaskQueueUserDataPropagation(ctx, &matchingservice.CheckTaskQueueUserDataPropagationRequest{
NamespaceId: a.namespace.ID().String(),
TaskQueue: name,
Version: version,
Expand All @@ -145,7 +167,7 @@ func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context
func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, args *deploymentspb.CheckTaskQueuesHavePollersActivityArgs) (bool, error) {
versionStr := worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromVersion(args.WorkerDeploymentVersion))
for tqName, tqTypes := range args.TaskQueuesAndTypes {
res, err := a.matchingClient.DescribeTaskQueue(ctx, &matchingservice.DescribeTaskQueueRequest{
res, err := a.MatchingClient.DescribeTaskQueue(ctx, &matchingservice.DescribeTaskQueueRequest{
NamespaceId: a.namespace.ID().String(),
DescRequest: &workflowservice.DescribeTaskQueueRequest{
Namespace: a.namespace.Name().String(),
Expand Down Expand Up @@ -176,7 +198,7 @@ func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, ar

func (a *VersionActivities) GetVersionDrainageStatus(ctx context.Context, version *deploymentspb.WorkerDeploymentVersion) (*deploymentpb.VersionDrainageInfo, error) {
logger := activity.GetLogger(ctx)
response, err := a.deploymentClient.GetVersionDrainageStatus(ctx, a.namespace, worker_versioning.WorkerDeploymentVersionToStringV31(version))
response, err := a.WorkerDeploymentClient.GetVersionDrainageStatus(ctx, a.namespace, worker_versioning.WorkerDeploymentVersionToStringV31(version))
if err != nil {
logger.Error("error counting workflows for drainage status", "error", err)
return nil, err
Expand Down
17 changes: 16 additions & 1 deletion service/worker/workerdeployment/version_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,18 +367,24 @@
}

//nolint:revive,errcheck // In async mode the activities retry indefinitely so this function should not return error
func (d *VersionWorkflowRunner) deleteVersionFromTaskQueuesAsync(ctx workflow.Context) {

Check failure on line 370 in service/worker/workerdeployment/version_workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

(*go.temporal.io/server/service/worker/workerdeployment.VersionWorkflowRunner).deleteVersionFromTaskQueuesAsync is non-deterministic, reason: calls non-deterministic function time.Since
// If there are propagations in progress, we ask them to cancel and wait for them to do so.
// The reason is that the ongoing upsert propagation may overwrite the delete that we want to send here, unintentionally undoing it.
d.cancelPropagations = true
workflow.Await(ctx, func() bool { return d.asyncPropagationsInProgress == 1 }) // delete itself is counted as one
d.cancelPropagations = false // need to unset this in case the version is revived

// Not counting the possible wait for previous propagations in this propagation latency.
startTime := workflow.Now(ctx)
defer func() {
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(time.Since(startTime))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is using something like time.Since NDE prone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For metrics purposes yes, but the linter is not happy about it so I need to change it.

}()
d.deleteVersionFromTaskQueues(ctx, workflow.WithActivityOptions(ctx, propagationActivityOptions))
d.asyncPropagationsInProgress--
}

func (d *VersionWorkflowRunner) deleteVersionFromTaskQueues(ctx workflow.Context, activityCtx workflow.Context) error {

state := d.GetVersionState()

// sync version removal to task queues
Expand Down Expand Up @@ -564,7 +570,7 @@
}

//nolint:staticcheck // SA1019
func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *deploymentspb.SyncVersionStateUpdateArgs) (*deploymentspb.SyncVersionStateResponse, error) {

Check failure on line 573 in service/worker/workerdeployment/version_workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

(*go.temporal.io/server/service/worker/workerdeployment.VersionWorkflowRunner).handleSyncState is non-deterministic, reason: calls non-deterministic function (*go.temporal.io/server/service/worker/workerdeployment.VersionWorkflowRunner).syncTaskQueuesAsync
// use lock to enforce only one update at a time
err := d.lock.Lock(ctx)
if err != nil {
Expand Down Expand Up @@ -731,7 +737,7 @@
}
}

func (d *VersionWorkflowRunner) refreshDrainageInfo(ctx workflow.Context) {

Check failure on line 740 in service/worker/workerdeployment/version_workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

(*go.temporal.io/server/service/worker/workerdeployment.VersionWorkflowRunner).refreshDrainageInfo is non-deterministic, reason: calls non-deterministic function (*go.temporal.io/server/service/worker/workerdeployment.VersionWorkflowRunner).updateVersionStatusAfterDrainageStatusChange
if d.VersionState.GetDrainageInfo().GetStatus() != enumspb.VERSION_DRAINAGE_STATUS_DRAINING {
return // only refresh when status is draining
}
Expand Down Expand Up @@ -835,7 +841,7 @@
return enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_UNSPECIFIED
}

func (d *VersionWorkflowRunner) updateVersionStatusAfterDrainageStatusChange(ctx workflow.Context, newStatus enumspb.VersionDrainageStatus) {

Check failure on line 844 in service/worker/workerdeployment/version_workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

(*go.temporal.io/server/service/worker/workerdeployment.VersionWorkflowRunner).updateVersionStatusAfterDrainageStatusChange is non-deterministic, reason: calls non-deterministic function (*go.temporal.io/server/service/worker/workerdeployment.VersionWorkflowRunner).syncTaskQueuesAsync
if newStatus == enumspb.VERSION_DRAINAGE_STATUS_DRAINED {
d.VersionState.Status = enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINED
} else if newStatus == enumspb.VERSION_DRAINAGE_STATUS_DRAINING {
Expand Down Expand Up @@ -952,11 +958,16 @@
}

// syncTaskQueuesAsync performs async propagation of routing config
func (d *VersionWorkflowRunner) syncTaskQueuesAsync(

Check failure on line 961 in service/worker/workerdeployment/version_workflow.go

View workflow job for this annotation

GitHub Actions / lint-workflows

(*go.temporal.io/server/service/worker/workerdeployment.VersionWorkflowRunner).syncTaskQueuesAsync is non-deterministic, reason: calls non-deterministic function time.Since
ctx workflow.Context,
routingConfig *deploymentpb.RoutingConfig,
newStatus enumspb.WorkerDeploymentVersionStatus,
) error {
startTime := workflow.Now(ctx)
defer func() {
d.metrics.Timer(metrics.VersioningDataPropagationLatency.Name()).Record(time.Since(startTime))
}()

state := d.GetVersionState()

// Build WorkerDeploymentVersionData for this version from current state
Expand Down Expand Up @@ -1044,7 +1055,11 @@
}

if routingConfig != nil {
d.syncSummary(ctx)
if workflow.GetVersion(ctx, "no-propagation-sync-summary", workflow.DefaultVersion, 0) == workflow.DefaultVersion {
// TODO: clean this unnecessary sync up.
// No summary changes happen in async propagation that the deployment workflow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unfinished comment i believe

d.syncSummary(ctx)
}
// Signal deployment workflow that routing config propagation completed
d.signalPropagationComplete(ctx, routingConfig.GetRevisionNumber())
}
Expand Down
Loading