Skip to content

Commit 2160a7b

Browse files
committed
Fix max deployment count check to not block new versions
1 parent 7764cbd commit 2160a7b

File tree

3 files changed

+38
-16
lines changed

3 files changed

+38
-16
lines changed

service/matching/physical_task_queue_manager.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,13 @@ func (c *physicalTaskQueueManagerImpl) ensureRegisteredInDeploymentVersion(
566566
}
567567
var errMaxTaskQueuesInVersion workerdeployment.ErrMaxTaskQueuesInVersion
568568
var errMaxVersionsInDeployment workerdeployment.ErrMaxVersionsInDeployment
569+
var errMaxDeploymentsInNamespace workerdeployment.ErrMaxDeploymentsInNamespace
569570
if errors.As(err, &errMaxTaskQueuesInVersion) {
570571
err = errMaxTaskQueuesInVersion
571572
} else if errors.As(err, &errMaxVersionsInDeployment) {
572573
err = errMaxVersionsInDeployment
574+
} else if errors.As(err, &errMaxDeploymentsInNamespace) {
575+
err = errMaxDeploymentsInNamespace
573576
} else {
574577
// Do not surface low level error to user
575578
c.logger.Error("error while registering version", tag.Error(err))

service/worker/workerdeployment/client.go

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ type Client interface {
179179

180180
type ErrMaxTaskQueuesInVersion struct{ error }
181181
type ErrMaxVersionsInDeployment struct{ error }
182+
type ErrMaxDeploymentsInNamespace struct{ error }
182183
type ErrRegister struct{ error }
183184

184185
// ClientImpl implements Client
@@ -791,15 +792,6 @@ func (d *ClientImpl) StartWorkerDeployment(
791792
//revive:disable-next-line:defer
792793
defer d.record("StartWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, identity)()
793794

794-
// TODO (Carly): either max page size or default page size is 1000, so if there are > 1000 this would not catch it
795-
deps, _, err := d.ListWorkerDeployments(ctx, namespaceEntry, 0, nil)
796-
if err != nil {
797-
return err
798-
}
799-
if len(deps) >= d.maxDeployments(namespaceEntry.Name().String()) {
800-
return serviceerror.NewFailedPrecondition("maximum deployments in namespace, delete manually to continue deploying.")
801-
}
802-
803795
workflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)
804796

805797
input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{
@@ -1020,13 +1012,20 @@ func (d *ClientImpl) updateWithStartWorkerDeployment(
10201012

10211013
workflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)
10221014

1023-
// TODO (Carly): either max page size or default page size is 1000, so if there are > 1000 this would not catch it
1024-
deps, _, err := d.ListWorkerDeployments(ctx, namespaceEntry, 0, nil)
1015+
_, _, err = d.DescribeWorkerDeployment(ctx, namespaceEntry, deploymentName)
10251016
if err != nil {
1026-
return nil, err
1027-
}
1028-
if len(deps) >= d.maxDeployments(namespaceEntry.Name().String()) {
1029-
return nil, serviceerror.NewFailedPrecondition("maximum deployments in namespace, delete manually to continue deploying.")
1017+
var notFound *serviceerror.NotFound
1018+
if errors.As(err, &notFound) {
1019+
// New deployment, make sure we're not exceeding the limit
1020+
count, err := d.countWorkerDeployments(ctx, namespaceEntry)
1021+
if err != nil {
1022+
return nil, err
1023+
}
1024+
limit := d.maxDeployments(namespaceEntry.Name().String())
1025+
if count >= int64(limit) {
1026+
return nil, ErrMaxDeploymentsInNamespace{error: errors.New(fmt.Sprintf("reached maximum deployments in namespace (%d)", limit))}
1027+
}
1028+
}
10301029
}
10311030

10321031
input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{
@@ -1054,6 +1053,26 @@ func (d *ClientImpl) updateWithStartWorkerDeployment(
10541053
)
10551054
}
10561055

1056+
func (d *ClientImpl) countWorkerDeployments(
1057+
ctx context.Context,
1058+
namespaceEntry *namespace.Namespace,
1059+
) (count int64, retError error) {
1060+
query := WorkerDeploymentVisibilityBaseListQuery
1061+
1062+
persistenceResp, err := d.visibilityManager.CountWorkflowExecutions(
1063+
ctx,
1064+
&manager.CountWorkflowExecutionsRequest{
1065+
NamespaceID: namespaceEntry.ID(),
1066+
Namespace: namespaceEntry.Name(),
1067+
Query: query,
1068+
},
1069+
)
1070+
if err != nil {
1071+
return 0, err
1072+
}
1073+
return persistenceResp.Count, nil
1074+
}
1075+
10571076
func (d *ClientImpl) updateWithStartWorkerDeploymentVersion(
10581077
ctx context.Context,
10591078
namespaceEntry *namespace.Namespace,

tests/worker_deployment_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (s *WorkerDeploymentSuite) TestNamespaceDeploymentsLimit() {
258258
s.ensureCreateVersionInDeployment(tv)
259259

260260
// pollers of the second deployment version should be rejected
261-
s.pollFromDeploymentExpectFail(ctx, tv.WithDeploymentSeriesNumber(2), "Namespace deployments limit reached")
261+
s.pollFromDeploymentExpectFail(ctx, tv.WithDeploymentSeriesNumber(2), "reached maximum deployments in namespace (1)")
262262
}
263263

264264
func (s *WorkerDeploymentSuite) TestDescribeWorkerDeployment_TwoVersions_Sorted() {

0 commit comments

Comments
 (0)