Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions service/matching/physical_task_queue_manager.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding new versions to existing deployment

I did a quick pass and i'm going to do one more soon, but the changes seem to be related to Listing workflow executions; how is/was this preventing adding new versions to deployment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was wrong - ignore the above comment - reviewing again

Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,13 @@ func (c *physicalTaskQueueManagerImpl) ensureRegisteredInDeploymentVersion(
}
var errMaxTaskQueuesInVersion workerdeployment.ErrMaxTaskQueuesInVersion
var errMaxVersionsInDeployment workerdeployment.ErrMaxVersionsInDeployment
var errMaxDeploymentsInNamespace workerdeployment.ErrMaxDeploymentsInNamespace
if errors.As(err, &errMaxTaskQueuesInVersion) {
err = errMaxTaskQueuesInVersion
} else if errors.As(err, &errMaxVersionsInDeployment) {
err = errMaxVersionsInDeployment
} else if errors.As(err, &errMaxDeploymentsInNamespace) {
err = errMaxDeploymentsInNamespace
} else {
// Do not surface low level error to user
c.logger.Error("error while registering version", tag.Error(err))
Expand Down
73 changes: 60 additions & 13 deletions service/worker/workerdeployment/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type Client interface {

type ErrMaxTaskQueuesInVersion struct{ error }
type ErrMaxVersionsInDeployment struct{ error }
type ErrMaxDeploymentsInNamespace struct{ error }
type ErrRegister struct{ error }

// ClientImpl implements Client
Expand Down Expand Up @@ -422,6 +423,34 @@ func (d *ClientImpl) DescribeWorkerDeployment(
return dInfo, queryResponse.GetState().GetConflictToken(), nil
}

func (d *ClientImpl) workerDeploymentExists(
ctx context.Context,
namespaceEntry *namespace.Namespace,
deploymentName string,
) (bool, error) {
deploymentWorkflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)

res, err := d.historyClient.DescribeWorkflowExecution(ctx, &historyservice.DescribeWorkflowExecutionRequest{
NamespaceId: namespaceEntry.ID().String(),
Request: &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: namespaceEntry.Name().String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: deploymentWorkflowID,
},
},
})
if err != nil {
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) {
return false, nil
}
return false, err
}

// Deployment exists only if the entity wf is running
return res.GetWorkflowExecutionInfo().GetStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, nil
}

func (d *ClientImpl) ListWorkerDeployments(
ctx context.Context,
namespaceEntry *namespace.Namespace,
Expand Down Expand Up @@ -791,15 +820,6 @@ func (d *ClientImpl) StartWorkerDeployment(
//revive:disable-next-line:defer
defer d.record("StartWorkerDeployment", &retErr, namespaceEntry.Name(), deploymentName, identity)()

// TODO (Carly): either max page size or default page size is 1000, so if there are > 1000 this would not catch it
deps, _, err := d.ListWorkerDeployments(ctx, namespaceEntry, 0, nil)
if err != nil {
return err
}
if len(deps) >= d.maxDeployments(namespaceEntry.Name().String()) {
return serviceerror.NewFailedPrecondition("maximum deployments in namespace, delete manually to continue deploying.")
}

workflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)

input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{
Expand Down Expand Up @@ -1020,13 +1040,20 @@ func (d *ClientImpl) updateWithStartWorkerDeployment(

workflowID := worker_versioning.GenerateDeploymentWorkflowID(deploymentName)

// TODO (Carly): either max page size or default page size is 1000, so if there are > 1000 this would not catch it
deps, _, err := d.ListWorkerDeployments(ctx, namespaceEntry, 0, nil)
exists, err := d.workerDeploymentExists(ctx, namespaceEntry, deploymentName)
if err != nil {
return nil, err
}
if len(deps) >= d.maxDeployments(namespaceEntry.Name().String()) {
return nil, serviceerror.NewFailedPrecondition("maximum deployments in namespace, delete manually to continue deploying.")
if !exists {
// New deployment, make sure we're not exceeding the limit
count, err := d.countWorkerDeployments(ctx, namespaceEntry)
if err != nil {
return nil, err
}
limit := d.maxDeployments(namespaceEntry.Name().String())
if count >= int64(limit) {
return nil, ErrMaxDeploymentsInNamespace{error: errors.New(fmt.Sprintf("reached maximum deployments in namespace (%d)", limit))}
}
}

input, err := sdk.PreferProtoDataConverter.ToPayloads(&deploymentspb.WorkerDeploymentWorkflowArgs{
Expand Down Expand Up @@ -1054,6 +1081,26 @@ func (d *ClientImpl) updateWithStartWorkerDeployment(
)
}

func (d *ClientImpl) countWorkerDeployments(
ctx context.Context,
namespaceEntry *namespace.Namespace,
) (count int64, retError error) {
query := WorkerDeploymentVisibilityBaseListQuery

persistenceResp, err := d.visibilityManager.CountWorkflowExecutions(
ctx,
&manager.CountWorkflowExecutionsRequest{
NamespaceID: namespaceEntry.ID(),
Namespace: namespaceEntry.Name(),
Query: query,
},
)
if err != nil {
return 0, err
}
return persistenceResp.Count, nil
}

func (d *ClientImpl) updateWithStartWorkerDeploymentVersion(
ctx context.Context,
namespaceEntry *namespace.Namespace,
Expand Down
2 changes: 1 addition & 1 deletion tests/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (s *WorkerDeploymentSuite) TestNamespaceDeploymentsLimit() {
s.ensureCreateVersionInDeployment(tv)

// pollers of the second deployment version should be rejected
s.pollFromDeploymentExpectFail(ctx, tv.WithDeploymentSeriesNumber(2), "Namespace deployments limit reached")
s.pollFromDeploymentExpectFail(ctx, tv.WithDeploymentSeriesNumber(2), "reached maximum deployments in namespace (1)")
}

func (s *WorkerDeploymentSuite) TestDescribeWorkerDeployment_TwoVersions_Sorted() {
Expand Down
Loading