Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ type (
// The errors it can return:
// - serviceerror.FailedPrecondition when the conflict token is invalid
// WARNING: Worker versioning-2 is currently experimental, and requires server 1.XX+
UpdateWorkerVersioningRules(ctx context.Context, options *UpdateWorkerVersioningRulesOptions) (VersioningConflictToken, error)
UpdateWorkerVersioningRules(ctx context.Context, options *UpdateWorkerVersioningRulesOptions) (*WorkerVersioningRules, error)

// GetWorkerVersioningRules
// Returns the worker-build-id assignment and redirect rules for a task queue.
Expand Down
2 changes: 1 addition & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ type (
// The errors it can return:
// - serviceerror.FailedPrecondition when the conflict token is invalid
// WARNING: Worker versioning-2 is currently experimental, and requires server 1.XX+
UpdateWorkerVersioningRules(ctx context.Context, options *UpdateWorkerVersioningRulesOptions) (VersioningConflictToken, error)
UpdateWorkerVersioningRules(ctx context.Context, options *UpdateWorkerVersioningRulesOptions) (*WorkerVersioningRules, error)

// GetWorkerVersioningRules returns the worker-build-id assignment and redirect rules for a task queue.
// WARNING: Worker versioning-2 is currently experimental, and requires server 1.XX+
Expand Down
12 changes: 6 additions & 6 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,23 +1039,23 @@ func (wc *WorkflowClient) GetWorkerTaskReachability(ctx context.Context, options
// task queue. This is used in conjunction with workers who specify their build id and thus opt into the feature.
// The errors it can return:
// - serviceerror.FailedPrecondition when the conflict token is invalid
func (wc *WorkflowClient) UpdateWorkerVersioningRules(ctx context.Context, options *UpdateWorkerVersioningRulesOptions) (VersioningConflictToken, error) {
func (wc *WorkflowClient) UpdateWorkerVersioningRules(ctx context.Context, options *UpdateWorkerVersioningRulesOptions) (*WorkerVersioningRules, error) {
if err := wc.ensureInitialized(); err != nil {
return VersioningConflictToken{}, err
return nil, err
}

request, err := options.validateAndConvertToProto(wc.namespace)
if err != nil {
return VersioningConflictToken{}, err
return nil, err
}

grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()
resp, err := wc.workflowService.UpdateWorkerVersioningRules(grpcCtx, request)
if err != nil {
return VersioningConflictToken{}, err
return nil, err
}
return workerVersioningConflictTokenFromProtoResponse(resp), nil
return workerVersioningRulesFromProtoUpdateResponse(resp), nil
}

// GetWorkerVersioningRules returns the worker-build-id assignment and redirect rules for a task queue.
Expand All @@ -1076,7 +1076,7 @@ func (wc *WorkflowClient) GetWorkerVersioningRules(ctx context.Context, options
if err != nil {
return nil, err
}
return workerVersioningRulesFromProtoResponse(resp), nil
return workerVersioningRulesFromProtoGetResponse(resp), nil
}

func (wc *WorkflowClient) UpdateWorkflowWithOptions(
Expand Down
28 changes: 15 additions & 13 deletions internal/worker_versioning_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,22 +370,19 @@ func versioningRedirectRuleFromProto(rule *taskqueuepb.CompatibleBuildIdRedirect
return result
}

func workerVersioningRulesFromProtoResponse(response *workflowservice.GetWorkerVersioningRulesResponse) *WorkerVersioningRules {
if response == nil {
return nil
}
aRules := make([]*VersioningAssignmentRuleWithTimestamp, len(response.GetAssignmentRules()))
for i, s := range response.GetAssignmentRules() {
func workerVersioningRulesFromResponse(assignmentRules []*taskqueuepb.TimestampedBuildIdAssignmentRule, redirectRules []*taskqueuepb.TimestampedCompatibleBuildIdRedirectRule, token []byte) *WorkerVersioningRules {
aRules := make([]*VersioningAssignmentRuleWithTimestamp, len(assignmentRules))
for i, s := range assignmentRules {
aRules[i] = versioningAssignmentRuleFromProto(s.GetRule(), s.GetCreateTime())
}

rRules := make([]*VersioningRedirectRuleWithTimestamp, len(response.GetCompatibleRedirectRules()))
for i, s := range response.GetCompatibleRedirectRules() {
rRules := make([]*VersioningRedirectRuleWithTimestamp, len(redirectRules))
for i, s := range redirectRules {
rRules[i] = versioningRedirectRuleFromProto(s.GetRule(), s.GetCreateTime())
}

conflictToken := VersioningConflictToken{
token: response.GetConflictToken(),
token,
}
return &WorkerVersioningRules{
AssignmentRules: aRules,
Expand All @@ -394,13 +391,18 @@ func workerVersioningRulesFromProtoResponse(response *workflowservice.GetWorkerV
}
}

func workerVersioningConflictTokenFromProtoResponse(response *workflowservice.UpdateWorkerVersioningRulesResponse) VersioningConflictToken {
func workerVersioningRulesFromProtoUpdateResponse(response *workflowservice.UpdateWorkerVersioningRulesResponse) *WorkerVersioningRules {
if response == nil {
return VersioningConflictToken{}
return nil
}
return VersioningConflictToken{
token: response.GetConflictToken(),
return workerVersioningRulesFromResponse(response.GetAssignmentRules(), response.GetCompatibleRedirectRules(), response.GetConflictToken())
}

func workerVersioningRulesFromProtoGetResponse(response *workflowservice.GetWorkerVersioningRulesResponse) *WorkerVersioningRules {
if response == nil {
return nil
}
return workerVersioningRulesFromResponse(response.GetAssignmentRules(), response.GetCompatibleRedirectRules(), response.GetConflictToken())
}

func (r *VersioningRampByPercentage) validateRamp() error {
Expand Down
4 changes: 2 additions & 2 deletions internal/worker_versioning_rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

func Test_WorkerVersioningRules_fromProtoResponse(t *testing.T) {
func Test_WorkerVersioningRules_fromProtoGetResponse(t *testing.T) {
nowProto := timestamppb.Now()
timestamp := nowProto.AsTime()
tests := []struct {
Expand Down Expand Up @@ -78,7 +78,7 @@ func Test_WorkerVersioningRules_fromProtoResponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, workerVersioningRulesFromProtoResponse(tt.response), "workerVersioningRulesFromProtoResponse(%v)", tt.response)
assert.Equalf(t, tt.want, workerVersioningRulesFromProtoGetResponse(tt.response), "workerVersioningRulesFromProtoGetResponse(%v)", tt.response)
})
}
}
10 changes: 5 additions & 5 deletions mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 34 additions & 30 deletions test/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (ts *WorkerVersioningTestSuite) TestManipulateRules() {
})
ts.NoError(err)

token, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: res.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
Expand All @@ -123,9 +123,9 @@ func (ts *WorkerVersioningTestSuite) TestManipulateRules() {
})
ts.NoError(err)

token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
RuleIndex: 0,
Rule: client.VersioningAssignmentRule{
Expand All @@ -138,9 +138,9 @@ func (ts *WorkerVersioningTestSuite) TestManipulateRules() {
})
ts.NoError(err)

token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpInsertRedirectRule{
Rule: client.VersioningRedirectRule{
SourceBuildID: "1.0",
Expand All @@ -155,6 +155,8 @@ func (ts *WorkerVersioningTestSuite) TestManipulateRules() {
})
ts.NoError(err)

ts.Equal(res, resp)

ts.Equal("2.0", res.AssignmentRules[0].Rule.TargetBuildID)
r, ok := res.AssignmentRules[0].Rule.Ramp.(*client.VersioningRampByPercentage)
ts.Truef(ok, "Not a percentage ramp")
Expand All @@ -175,7 +177,7 @@ func (ts *WorkerVersioningTestSuite) TestReplaceDeleteRules() {
})
ts.NoError(err)

token, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: res.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
Expand All @@ -188,9 +190,9 @@ func (ts *WorkerVersioningTestSuite) TestReplaceDeleteRules() {
ts.NoError(err)

// Replace by unconditional rule is OK
token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpReplaceAssignmentRule{
RuleIndex: 0,
Rule: client.VersioningAssignmentRule{
Expand All @@ -203,7 +205,7 @@ func (ts *WorkerVersioningTestSuite) TestReplaceDeleteRules() {
// Replace by conditional rule fails
_, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpReplaceAssignmentRule{
RuleIndex: 0,
Rule: client.VersioningAssignmentRule{
Expand All @@ -219,9 +221,9 @@ func (ts *WorkerVersioningTestSuite) TestReplaceDeleteRules() {
ts.Error(err)
ts.IsType(&serviceerror.FailedPrecondition{}, err)

token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpReplaceAssignmentRule{
RuleIndex: 0,
Rule: client.VersioningAssignmentRule{
Expand All @@ -237,7 +239,7 @@ func (ts *WorkerVersioningTestSuite) TestReplaceDeleteRules() {

_, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpDeleteAssignmentRule{
RuleIndex: 0,
Force: false,
Expand All @@ -249,7 +251,7 @@ func (ts *WorkerVersioningTestSuite) TestReplaceDeleteRules() {

_, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpDeleteAssignmentRule{
RuleIndex: 0,
Force: true,
Expand All @@ -267,7 +269,7 @@ func (ts *WorkerVersioningTestSuite) TestCommitRules() {
})
ts.NoError(err)

token, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: res.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
Expand All @@ -279,9 +281,9 @@ func (ts *WorkerVersioningTestSuite) TestCommitRules() {
})
ts.NoError(err)

token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
RuleIndex: 0,
Rule: client.VersioningAssignmentRule{
Expand All @@ -297,7 +299,7 @@ func (ts *WorkerVersioningTestSuite) TestCommitRules() {
// No worker recently polling so it should fail
_, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpCommitBuildID{
TargetBuildID: "2.0",
Force: false,
Expand All @@ -307,9 +309,9 @@ func (ts *WorkerVersioningTestSuite) TestCommitRules() {
ts.IsType(&serviceerror.FailedPrecondition{}, err)

// Use the `force`...
token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpCommitBuildID{
TargetBuildID: "2.0",
Force: true,
Expand All @@ -322,6 +324,8 @@ func (ts *WorkerVersioningTestSuite) TestCommitRules() {
})
ts.NoError(err)

ts.Equal(res, resp)

// replace all rules by unconditional "2.0"
ts.Equal(1, len(res.AssignmentRules))
ts.Equal("2.0", res.AssignmentRules[0].Rule.TargetBuildID)
Expand All @@ -338,7 +342,7 @@ func (ts *WorkerVersioningTestSuite) TestConflictTokens() {
})
ts.NoError(err)

token, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: res.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
Expand All @@ -363,9 +367,9 @@ func (ts *WorkerVersioningTestSuite) TestConflictTokens() {
ts.Error(err)
ts.IsType(&serviceerror.FailedPrecondition{}, err)

token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token, // original token
ConflictToken: resp.ConflictToken, // original token
Operation: &client.VersioningOpInsertAssignmentRule{
RuleIndex: 0,
Rule: client.VersioningAssignmentRule{
Expand Down Expand Up @@ -447,7 +451,7 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasksWithRules()
})
ts.NoError(err)

token, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: res.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
Expand Down Expand Up @@ -475,9 +479,9 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasksWithRules()
ts.NoError(err)

// Now add the 2.0 version
token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
_, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
RuleIndex: 0,
Rule: client.VersioningAssignmentRule{
Expand Down Expand Up @@ -713,7 +717,7 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetimeWithR
})
ts.NoError(err)

token, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err := ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: result.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
Expand Down Expand Up @@ -750,9 +754,9 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetimeWithR
worker1.Stop()

// Add new compat ver
token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpInsertAssignmentRule{
RuleIndex: 0,
Rule: client.VersioningAssignmentRule{
Expand All @@ -761,9 +765,9 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetimeWithR
},
})

token, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
resp, err = ts.client.UpdateWorkerVersioningRules(ctx, &client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: token,
ConflictToken: resp.ConflictToken,
Operation: &client.VersioningOpInsertRedirectRule{
Rule: client.VersioningRedirectRule{
SourceBuildID: "1.0",
Expand Down