Skip to content

Commit d673193

Browse files
authored
Fix max deployment count check to not block new versions (#7841)
## What changed? The check to enforce number of Worker Deployments remain within the limit for a NS could also prevent adding new versions to existing deployments when user has just enough deployments that reach the limit but not exceed it. ## Why? Fixing unexpected behavior. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks None.
1 parent 6784724 commit d673193

File tree

3 files changed

+64
-14
lines changed

3 files changed

+64
-14
lines changed

service/matching/physical_task_queue_manager.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,13 @@ func (c *physicalTaskQueueManagerImpl) ensureRegisteredInDeploymentVersion(
566566
}
567567
var errMaxTaskQueuesInVersion workerdeployment.ErrMaxTaskQueuesInVersion
568568
var errMaxVersionsInDeployment workerdeployment.ErrMaxVersionsInDeployment
569+
var errMaxDeploymentsInNamespace workerdeployment.ErrMaxDeploymentsInNamespace
569570
if errors.As(err, &errMaxTaskQueuesInVersion) {
570571
err = errMaxTaskQueuesInVersion
571572
} else if errors.As(err, &errMaxVersionsInDeployment) {
572573
err = errMaxVersionsInDeployment
574+
} else if errors.As(err, &errMaxDeploymentsInNamespace) {
575+
err = errMaxDeploymentsInNamespace
573576
} else {
574577
// Do not surface low level error to user
575578
c.logger.Error("error while registering version", tag.Error(err))

service/worker/workerdeployment/client.go

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ type Client interface {
179179

180180
type ErrMaxTaskQueuesInVersion struct{ error }
181181
type ErrMaxVersionsInDeployment struct{ error }
182+
type ErrMaxDeploymentsInNamespace struct{ error }
182183
type ErrRegister struct{ error }
183184

184185
// ClientImpl implements Client
@@ -422,6 +423,34 @@ func (d *ClientImpl) DescribeWorkerDeployment(
422423
return dInfo, queryResponse.GetState().GetConflictToken(), nil
423424
}
424425

426+
func (d *ClientImpl) workerDeploymentExists(
427+
ctx context.Context,
428+
namespaceEntry *namespace.Namespace,
429+
deploymentName string,
430+
) (bool, error) {
431+
deploymentWorkflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)
432+
433+
res, err := d.historyClient.DescribeWorkflowExecution(ctx, &historyservice.DescribeWorkflowExecutionRequest{
434+
NamespaceId: namespaceEntry.ID().String(),
435+
Request: &workflowservice.DescribeWorkflowExecutionRequest{
436+
Namespace: namespaceEntry.Name().String(),
437+
Execution: &commonpb.WorkflowExecution{
438+
WorkflowId: deploymentWorkflowID,
439+
},
440+
},
441+
})
442+
if err != nil {
443+
var notFound *serviceerror.NotFound
444+
if errors.As(err, &notFound) {
445+
return false, nil
446+
}
447+
return false, err
448+
}
449+
450+
// Deployment exists only if the entity wf is running
451+
return res.GetWorkflowExecutionInfo().GetStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, nil
452+
}
453+
425454
func (d *ClientImpl) ListWorkerDeployments(
426455
ctx context.Context,
427456
namespaceEntry *namespace.Namespace,
@@ -791,15 +820,6 @@ func (d *ClientImpl) StartWorkerDeployment(
791820
//revive:disable-next-line:defer
792821
defer d.record("StartWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, identity)()
793822

794-
// TODO (Carly): either max page size or default page size is 1000, so if there are > 1000 this would not catch it
795-
deps, _, err := d.ListWorkerDeployments(ctx, namespaceEntry, 0, nil)
796-
if err != nil {
797-
return err
798-
}
799-
if len(deps) >= d.maxDeployments(namespaceEntry.Name().String()) {
800-
return serviceerror.NewFailedPrecondition("maximum deployments in namespace, delete manually to continue deploying.")
801-
}
802-
803823
workflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)
804824

805825
input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{
@@ -1020,13 +1040,20 @@ func (d *ClientImpl) updateWithStartWorkerDeployment(
10201040

10211041
workflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)
10221042

1023-
// TODO (Carly): either max page size or default page size is 1000, so if there are > 1000 this would not catch it
1024-
deps, _, err := d.ListWorkerDeployments(ctx, namespaceEntry, 0, nil)
1043+
exists, err := d.workerDeploymentExists(ctx, namespaceEntry, deploymentName)
10251044
if err != nil {
10261045
return nil, err
10271046
}
1028-
if len(deps) >= d.maxDeployments(namespaceEntry.Name().String()) {
1029-
return nil, serviceerror.NewFailedPrecondition("maximum deployments in namespace, delete manually to continue deploying.")
1047+
if !exists {
1048+
// New deployment, make sure we're not exceeding the limit
1049+
count, err := d.countWorkerDeployments(ctx, namespaceEntry)
1050+
if err != nil {
1051+
return nil, err
1052+
}
1053+
limit := d.maxDeployments(namespaceEntry.Name().String())
1054+
if count >= int64(limit) {
1055+
return nil, ErrMaxDeploymentsInNamespace{error: errors.New(fmt.Sprintf("reached maximum deployments in namespace (%d)", limit))}
1056+
}
10301057
}
10311058

10321059
input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{
@@ -1054,6 +1081,26 @@ func (d *ClientImpl) updateWithStartWorkerDeployment(
10541081
)
10551082
}
10561083

1084+
func (d *ClientImpl) countWorkerDeployments(
1085+
ctx context.Context,
1086+
namespaceEntry *namespace.Namespace,
1087+
) (count int64, retError error) {
1088+
query := WorkerDeploymentVisibilityBaseListQuery
1089+
1090+
persistenceResp, err := d.visibilityManager.CountWorkflowExecutions(
1091+
ctx,
1092+
&manager.CountWorkflowExecutionsRequest{
1093+
NamespaceID: namespaceEntry.ID(),
1094+
Namespace: namespaceEntry.Name(),
1095+
Query: query,
1096+
},
1097+
)
1098+
if err != nil {
1099+
return 0, err
1100+
}
1101+
return persistenceResp.Count, nil
1102+
}
1103+
10571104
func (d *ClientImpl) updateWithStartWorkerDeploymentVersion(
10581105
ctx context.Context,
10591106
namespaceEntry *namespace.Namespace,

tests/worker_deployment_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (s *WorkerDeploymentSuite) TestNamespaceDeploymentsLimit() {
258258
s.ensureCreateVersionInDeployment(tv)
259259

260260
// pollers of the second deployment version should be rejected
261-
s.pollFromDeploymentExpectFail(ctx, tv.WithDeploymentSeriesNumber(2), "Namespace deployments limit reached")
261+
s.pollFromDeploymentExpectFail(ctx, tv.WithDeploymentSeriesNumber(2), "reached maximum deployments in namespace (1)")
262262
}
263263

264264
func (s *WorkerDeploymentSuite) TestDescribeWorkerDeployment_TwoVersions_Sorted() {

0 commit comments

Comments
 (0)