Skip to content

Commit 808e144

Browse files
authored
Convert ResourceExhausted errors from versioning API's to Internal ones with nice error message (#7815)
## What changed? - WISOTT ## Why? - Better user experience ## How did you test it? - [x] built - [x] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [x] added new functional test(s) ## Potential risks - None
1 parent 5b4dcb9 commit 808e144

File tree

7 files changed

+212
-209
lines changed

7 files changed

+212
-209
lines changed

service/frontend/workflow_handler.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,18 @@ var (
8787
maxTime = time.Date(2100, 1, 1, 1, 0, 0, 0, time.UTC)
8888

8989
// Tail room for context deadline to bail out from retry for long poll.
90-
longPollTailRoom = time.Second
91-
90+
longPollTailRoom = time.Second
9291
errWaitForRefresh = serviceerror.NewDeadlineExceeded("waiting for schedule to refresh status of completed workflows")
9392
)
9493

94+
const (
95+
errTooManySetCurrentVersionRequests = "Too many SetWorkerDeploymentCurrentVersion requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."
96+
errTooManySetRampingVersionRequests = "Too many SetWorkerDeploymentRampingVersion requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."
97+
errTooManyDeleteDeploymentRequests = "Too many DeleteWorkerDeployment requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."
98+
errTooManyDeleteVersionRequests = "Too many DeleteWorkerDeploymentVersion requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."
99+
errTooManyVersionMetadataRequests = "Too many UpdateWorkerDeploymentVersionMetadata requests have been issued in rapid succession. Please throttle the request rate to avoid exceeding Worker Deployment resource limits."
100+
)
101+
95102
type (
96103
// WorkflowHandler - gRPC handler interface for workflowservice
97104
WorkflowHandler struct {
@@ -3414,6 +3421,9 @@ func (wh *WorkflowHandler) SetWorkerDeploymentCurrentVersion(ctx context.Context
34143421

34153422
resp, err := wh.workerDeploymentClient.SetCurrentVersion(ctx, namespaceEntry, request.DeploymentName, versionStr, request.Identity, request.IgnoreMissingTaskQueues, request.GetConflictToken())
34163423
if err != nil {
3424+
if common.IsResourceExhausted(err) {
3425+
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManySetCurrentVersionRequests)
3426+
}
34173427
return nil, err
34183428
}
34193429

@@ -3470,6 +3480,9 @@ func (wh *WorkflowHandler) SetWorkerDeploymentRampingVersion(ctx context.Context
34703480

34713481
resp, err := wh.workerDeploymentClient.SetRampingVersion(ctx, namespaceEntry, request.DeploymentName, versionStr, request.GetPercentage(), request.GetIdentity(), request.IgnoreMissingTaskQueues, request.GetConflictToken())
34723482
if err != nil {
3483+
if common.IsResourceExhausted(err) {
3484+
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManySetRampingVersionRequests)
3485+
}
34733486
return nil, err
34743487
}
34753488

@@ -3574,6 +3587,9 @@ func (wh *WorkflowHandler) DeleteWorkerDeployment(ctx context.Context, request *
35743587

35753588
err = wh.workerDeploymentClient.DeleteWorkerDeployment(ctx, namespaceEntry, request.DeploymentName, request.Identity)
35763589
if err != nil {
3590+
if common.IsResourceExhausted(err) {
3591+
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManyDeleteDeploymentRequests)
3592+
}
35773593
return nil, err
35783594
}
35793595

@@ -3600,6 +3616,9 @@ func (wh *WorkflowHandler) DeleteWorkerDeploymentVersion(ctx context.Context, re
36003616

36013617
err = wh.workerDeploymentClient.DeleteWorkerDeploymentVersion(ctx, namespaceEntry, versionStr, request.SkipDrainage, request.Identity)
36023618
if err != nil {
3619+
if common.IsResourceExhausted(err) {
3620+
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManyDeleteVersionRequests)
3621+
}
36033622
return nil, err
36043623
}
36053624

@@ -3628,10 +3647,12 @@ func (wh *WorkflowHandler) UpdateWorkerDeploymentVersionMetadata(ctx context.Con
36283647
versionStr = worker_versioning.ExternalWorkerDeploymentVersionToString(request.GetDeploymentVersion())
36293648
}
36303649

3631-
// todo (Shivam): Should we get identity from the request?
36323650
identity := uuid.New()
36333651
updatedMetadata, err := wh.workerDeploymentClient.UpdateVersionMetadata(ctx, namespaceEntry, versionStr, request.UpsertEntries, request.RemoveEntries, identity)
36343652
if err != nil {
3653+
if common.IsResourceExhausted(err) {
3654+
return nil, serviceerror.NewResourceExhaustedf(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, errTooManyVersionMetadataRequests)
3655+
}
36353656
return nil, err
36363657
}
36373658

service/matching/physical_task_queue_manager.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -427,9 +427,6 @@ func (c *physicalTaskQueueManagerImpl) DispatchNexusTask(
427427
}
428428

429429
func (c *physicalTaskQueueManagerImpl) UpdatePollerInfo(id pollerIdentity, pollMetadata *pollMetadata) {
430-
if c.queue.Version().IsVersioned() {
431-
fmt.Printf("poll info updated in %v\n", c.queue.Version())
432-
}
433430
c.pollerHistory.updatePollerInfo(id, pollMetadata)
434431
}
435432

service/worker/workerdeployment/client.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,10 +1572,3 @@ func (d *ClientImpl) getSyncBatchSize() int32 {
15721572
}
15731573
return syncBatchSize
15741574
}
1575-
1576-
// isFailedPrecondition checks if the error is a FailedPrecondition error. It also checks if the FailedPrecondition error is wrapped in an ApplicationError.
1577-
func isFailedPrecondition(err error) bool {
1578-
var failedPreconditionError *serviceerror.FailedPrecondition
1579-
var applicationError *temporal.ApplicationError
1580-
return errors.As(err, &failedPreconditionError) || (errors.As(err, &applicationError) && applicationError.Type() == errFailedPrecondition)
1581-
}

service/worker/workerdeployment/util.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package workerdeployment
22

33
import (
4+
"errors"
45
"fmt"
56
"strings"
67
"time"
@@ -145,3 +146,10 @@ func getSafeDurationConfig(ctx workflow.Context, id string, unsafeGetter func()
145146
func durationEq(a, b any) bool {
146147
return a == b
147148
}
149+
150+
// isFailedPrecondition checks if the error is a FailedPrecondition error. It also checks if the FailedPrecondition error is wrapped in an ApplicationError.
151+
func isFailedPrecondition(err error) bool {
152+
var failedPreconditionError *serviceerror.FailedPrecondition
153+
var applicationError *temporal.ApplicationError
154+
return errors.As(err, &failedPreconditionError) || (errors.As(err, &applicationError) && applicationError.Type() == errFailedPrecondition)
155+
}

service/worker/workerdeployment/version_workflow.go

Lines changed: 36 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -425,18 +425,6 @@ func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args
425425
d.lock.Unlock()
426426
}()
427427

428-
v := workflow.GetVersion(ctx, "RefactorRegisterWorker", workflow.DefaultVersion, 1)
429-
if v == workflow.DefaultVersion {
430-
// wait until deployment workflow started
431-
err = workflow.Await(ctx, func() bool { return d.VersionState.StartedDeploymentWorkflow })
432-
if err != nil {
433-
d.logger.Error("Update canceled before deployment workflow started")
434-
// TODO (Carly): This is likely due to too many deployments, but make sure we excluded other possible errors here and send a proper error message all the time.
435-
// TODO (Carly): mention the limit in here or make sure matching does in the error returned to the poller
436-
return temporal.NewApplicationError("failed to create deployment version, likely you are exceeding the limit of allowed deployments in a namespace", errTooManyDeployments)
437-
}
438-
}
439-
440428
// Add the task queue to the local state.
441429
if d.VersionState.TaskQueueFamilies == nil {
442430
d.VersionState.TaskQueueFamilies = make(map[string]*deploymentspb.VersionLocalState_TaskQueueFamilyData)
@@ -459,23 +447,6 @@ func (d *VersionWorkflowRunner) handleRegisterWorker(ctx workflow.Context, args
459447
}
460448

461449
activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions)
462-
463-
if v == workflow.DefaultVersion {
464-
// First try to add version to worker-deployment workflow so it rejects in case we hit the limit
465-
err = workflow.ExecuteActivity(activityCtx, d.a.AddVersionToWorkerDeployment, &deploymentspb.AddVersionToWorkerDeploymentRequest{
466-
DeploymentName: d.VersionState.Version.GetDeploymentName(),
467-
UpdateArgs: &deploymentspb.AddVersionUpdateArgs{
468-
Version: worker_versioning.WorkerDeploymentVersionToString(d.VersionState.Version),
469-
CreateTime: d.VersionState.CreateTime,
470-
},
471-
RequestId: d.newUUID(ctx),
472-
}).Get(ctx, nil)
473-
if err != nil {
474-
// TODO (carly): make sure the error message that goes to the user is informative and has the limit mentioned
475-
return temporal.NewApplicationError("too many versions in this deployment", errTooManyVersions)
476-
}
477-
}
478-
479450
// sync to user data
480451
var syncRes deploymentspb.SyncDeploymentVersionUserDataResponse
481452
err = workflow.ExecuteActivity(activityCtx, d.a.SyncDeploymentVersionUserData, &deploymentspb.SyncDeploymentVersionUserDataRequest{
@@ -545,31 +516,46 @@ func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *depl
545516
Version: state.GetVersion(),
546517
}
547518

548-
v := workflow.GetVersion(ctx, "SyncStateWithBatching", workflow.DefaultVersion, 1)
549-
if v == workflow.DefaultVersion {
550-
for _, tqName := range workflow.DeterministicKeys(state.TaskQueueFamilies) {
551-
byType := state.TaskQueueFamilies[tqName]
552-
data := &deploymentspb.DeploymentVersionData{
553-
Version: d.VersionState.Version,
554-
RoutingUpdateTime: args.RoutingUpdateTime,
555-
CurrentSinceTime: args.CurrentSinceTime,
556-
RampingSinceTime: args.RampingSinceTime,
557-
RampPercentage: args.RampPercentage,
558-
}
559-
var types []enumspb.TaskQueueType
560-
for _, tqType := range workflow.DeterministicKeys(byType.TaskQueues) {
561-
types = append(types, enumspb.TaskQueueType(tqType))
562-
}
519+
// send in the task-queue families in batches of syncBatchSize
520+
batches := make([][]*deploymentspb.SyncDeploymentVersionUserDataRequest_SyncUserData, 0)
521+
for _, tqName := range workflow.DeterministicKeys(state.TaskQueueFamilies) {
522+
byType := state.TaskQueueFamilies[tqName]
523+
data := &deploymentspb.DeploymentVersionData{
524+
Version: d.VersionState.Version,
525+
RoutingUpdateTime: args.RoutingUpdateTime,
526+
CurrentSinceTime: args.CurrentSinceTime,
527+
RampingSinceTime: args.RampingSinceTime,
528+
RampPercentage: args.RampPercentage,
529+
}
530+
var types []enumspb.TaskQueueType
531+
for _, tqType := range workflow.DeterministicKeys(byType.TaskQueues) {
532+
types = append(types, enumspb.TaskQueueType(tqType))
533+
}
563534

564-
syncReq.Sync = append(syncReq.Sync, &deploymentspb.SyncDeploymentVersionUserDataRequest_SyncUserData{
565-
Name: tqName,
566-
Types: types,
567-
Data: data,
568-
})
535+
syncReq.Sync = append(syncReq.Sync, &deploymentspb.SyncDeploymentVersionUserDataRequest_SyncUserData{
536+
Name: tqName,
537+
Types: types,
538+
Data: data,
539+
})
540+
541+
if len(syncReq.Sync) == int(d.VersionState.SyncBatchSize) {
542+
batches = append(batches, syncReq.Sync)
543+
syncReq.Sync = make([]*deploymentspb.SyncDeploymentVersionUserDataRequest_SyncUserData, 0) // reset the syncReq.Sync slice for the next batch
569544
}
545+
}
546+
if len(syncReq.Sync) > 0 {
547+
batches = append(batches, syncReq.Sync)
548+
}
549+
550+
// calling SyncDeploymentVersionUserData for each batch
551+
for _, batch := range batches {
570552
activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions)
571553
var syncRes deploymentspb.SyncDeploymentVersionUserDataResponse
572-
err = workflow.ExecuteActivity(activityCtx, d.a.SyncDeploymentVersionUserData, syncReq).Get(ctx, &syncRes)
554+
555+
err = workflow.ExecuteActivity(activityCtx, d.a.SyncDeploymentVersionUserData, &deploymentspb.SyncDeploymentVersionUserDataRequest{
556+
Version: state.GetVersion(),
557+
Sync: batch,
558+
}).Get(ctx, &syncRes)
573559
if err != nil {
574560
// TODO (Shivam): Compensation functions required to roll back the local state + activity changes.
575561
return nil, err
@@ -587,67 +573,6 @@ func (d *VersionWorkflowRunner) handleSyncState(ctx workflow.Context, args *depl
587573
return nil, err
588574
}
589575
}
590-
} else {
591-
592-
// send in the task-queue families in batches of syncBatchSize
593-
batches := make([][]*deploymentspb.SyncDeploymentVersionUserDataRequest_SyncUserData, 0)
594-
595-
for _, tqName := range workflow.DeterministicKeys(state.TaskQueueFamilies) {
596-
byType := state.TaskQueueFamilies[tqName]
597-
data := &deploymentspb.DeploymentVersionData{
598-
Version: d.VersionState.Version,
599-
RoutingUpdateTime: args.RoutingUpdateTime,
600-
CurrentSinceTime: args.CurrentSinceTime,
601-
RampingSinceTime: args.RampingSinceTime,
602-
RampPercentage: args.RampPercentage,
603-
}
604-
var types []enumspb.TaskQueueType
605-
for _, tqType := range workflow.DeterministicKeys(byType.TaskQueues) {
606-
types = append(types, enumspb.TaskQueueType(tqType))
607-
}
608-
609-
syncReq.Sync = append(syncReq.Sync, &deploymentspb.SyncDeploymentVersionUserDataRequest_SyncUserData{
610-
Name: tqName,
611-
Types: types,
612-
Data: data,
613-
})
614-
615-
if len(syncReq.Sync) == int(d.VersionState.SyncBatchSize) {
616-
batches = append(batches, syncReq.Sync)
617-
syncReq.Sync = make([]*deploymentspb.SyncDeploymentVersionUserDataRequest_SyncUserData, 0) // reset the syncReq.Sync slice for the next batch
618-
}
619-
}
620-
if len(syncReq.Sync) > 0 {
621-
batches = append(batches, syncReq.Sync)
622-
}
623-
624-
// calling SyncDeploymentVersionUserData for each batch
625-
for _, batch := range batches {
626-
activityCtx := workflow.WithActivityOptions(ctx, defaultActivityOptions)
627-
var syncRes deploymentspb.SyncDeploymentVersionUserDataResponse
628-
629-
err = workflow.ExecuteActivity(activityCtx, d.a.SyncDeploymentVersionUserData, &deploymentspb.SyncDeploymentVersionUserDataRequest{
630-
Version: state.GetVersion(),
631-
Sync: batch,
632-
}).Get(ctx, &syncRes)
633-
if err != nil {
634-
// TODO (Shivam): Compensation functions required to roll back the local state + activity changes.
635-
return nil, err
636-
}
637-
if len(syncRes.TaskQueueMaxVersions) > 0 {
638-
// wait for propagation
639-
err = workflow.ExecuteActivity(
640-
activityCtx,
641-
d.a.CheckWorkerDeploymentUserDataPropagation,
642-
&deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest{
643-
TaskQueueMaxVersions: syncRes.TaskQueueMaxVersions,
644-
}).Get(ctx, nil)
645-
if err != nil {
646-
// TODO (Shivam): Compensation functions required to roll back the local state + activity changes.
647-
return nil, err
648-
}
649-
}
650-
}
651576
}
652577

653578
wasAcceptingNewWorkflows := state.GetCurrentSinceTime() != nil || state.GetRampingSinceTime() != nil

0 commit comments

Comments
 (0)