Skip to content

Commit 9f9b172

Browse files
committed
Fail force-replication fast if any GenerateReplicationTasks/GenerateAndVerifyReplicationTasks activity returns error (#4642)
<!-- Describe what has changed in this PR --> **What changed?** <!-- Tell your future self why have you made these changes --> **Why?** Currently, the workflow will wait for all GenerateReplicationTasks activity to complete. The change will fail the workflow fast if any GenerateReplicationTasks activity failed. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit test and cluster test. <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is this PR a hotfix candidate or require that a notification be sent to the broader community? (Yes/No) --> **Is hotfix candidate?** No.
1 parent 6247d88 commit 9f9b172

File tree

4 files changed

+72
-11
lines changed

4 files changed

+72
-11
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ require (
104104
github.com/mattn/go-runewidth v0.0.14 // indirect
105105
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
106106
github.com/opentracing/opentracing-go v1.2.0 // indirect
107-
github.com/pkg/errors v0.9.1 // indirect
107+
github.com/pkg/errors v0.9.1
108108
github.com/pmezard/go-difflib v1.0.0 // indirect
109109
github.com/prometheus/client_model v0.4.0
110110
github.com/prometheus/common v0.44.0

service/worker/migration/activities.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"sort"
3232
"time"
3333

34+
"github.com/pkg/errors"
3435
commonpb "go.temporal.io/api/common/v1"
3536
replicationpb "go.temporal.io/api/replication/v1"
3637
"go.temporal.io/api/serviceerror"
@@ -112,6 +113,13 @@ func (r VerifyResult) isCompleted() bool {
112113
return r.isVerified() || r.isSkipped()
113114
}
114115

116+
func (e verifyReplicationTasksTimeoutErr) Error() string {
117+
return fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (retryable). Not found WorkflowExecution: %v,",
118+
e.timeout,
119+
e.details.LastNotFoundWorkflowExecution,
120+
)
121+
}
122+
115123
// TODO: CallerTypePreemptablee should be set in activity background context for all migration activities.
116124
// However, activity background context is per-worker, which means once set, all activities processed by the
117125
// worker will use CallerTypePreemptable, including those not related to migration. This is not ideal.
@@ -645,20 +653,13 @@ func (a *activities) verifyReplicationTasks(
645653
return false, progress, nil
646654

647655
default:
648-
return false, progress, err
656+
return false, progress, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed")
649657
}
650658
}
651659

652660
return true, progress, nil
653661
}
654662

655-
func (e verifyReplicationTasksTimeoutErr) Error() string {
656-
return fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (retryable). Not found WorkflowExecution: %v,",
657-
e.timeout,
658-
e.details.LastNotFoundWorkflowExecution,
659-
)
660-
}
661-
662663
const (
663664
defaultNoProgressRetryableTimeout = 5 * time.Minute
664665
defaultNoProgressNotRetryableTimeout = 15 * time.Minute

service/worker/migration/force_replication_workflow.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ func enqueueReplicationTasks(ctx workflow.Context, workflowExecutionsCh workflow
371371
var a *activities
372372
var futures []workflow.Future
373373
var workflowExecutions []commonpb.WorkflowExecution
374+
var lastActivityErr error
374375

375376
for workflowExecutionsCh.Receive(ctx, &workflowExecutions) {
376377
var replicationTaskFuture workflow.Future
@@ -394,10 +395,17 @@ func enqueueReplicationTasks(ctx workflow.Context, workflowExecutionsCh workflow
394395
pendingActivities++
395396
selector.AddFuture(replicationTaskFuture, func(f workflow.Future) {
396397
pendingActivities--
398+
399+
if err := f.Get(ctx, nil); err != nil {
400+
lastActivityErr = err
401+
}
397402
})
398403

399-
if pendingActivities == params.ConcurrentActivityCount {
404+
if pendingActivities >= params.ConcurrentActivityCount {
400405
selector.Select(ctx) // this will block until one of the in-flight activities completes
406+
if lastActivityErr != nil {
407+
return lastActivityErr
408+
}
401409
}
402410

403411
futures = append(futures, replicationTaskFuture)

service/worker/migration/force_replication_workflow_test.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ func TestForceReplicationWorkflow_ListWorkflowsError(t *testing.T) {
210210
env.AssertExpectations(t)
211211
}
212212

213-
func TestForceReplicationWorkflow_GenerateReplicationTaskError(t *testing.T) {
213+
func TestForceReplicationWorkflow_GenerateReplicationTaskRetryableError(t *testing.T) {
214214
testSuite := &testsuite.WorkflowTestSuite{}
215215
env := testSuite.NewTestWorkflowEnvironment()
216216

@@ -258,6 +258,58 @@ func TestForceReplicationWorkflow_GenerateReplicationTaskError(t *testing.T) {
258258
env.AssertExpectations(t)
259259
}
260260

261+
func TestForceReplicationWorkflow_GenerateReplicationTaskNonRetryableError(t *testing.T) {
262+
testSuite := &testsuite.WorkflowTestSuite{}
263+
env := testSuite.NewTestWorkflowEnvironment()
264+
265+
namespaceID := uuid.New()
266+
267+
var a *activities
268+
env.OnActivity(a.GetMetadata, mock.Anything, metadataRequest{Namespace: "test-ns"}).Return(&metadataResponse{ShardCount: 4, NamespaceID: namespaceID}, nil)
269+
270+
totalPageCount := 4
271+
currentPageCount := 0
272+
env.OnActivity(a.ListWorkflows, mock.Anything, mock.Anything).Return(func(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*listWorkflowsResponse, error) {
273+
assert.Equal(t, "test-ns", request.Namespace)
274+
currentPageCount++
275+
if currentPageCount < totalPageCount {
276+
return &listWorkflowsResponse{
277+
Executions: []commonpb.WorkflowExecution{},
278+
NextPageToken: []byte("fake-page-token"),
279+
}, nil
280+
}
281+
// your mock function implementation
282+
return &listWorkflowsResponse{
283+
Executions: []commonpb.WorkflowExecution{},
284+
NextPageToken: nil, // last page
285+
}, nil
286+
})
287+
288+
// Only expect GenerateReplicationTasks to execute once and workflow will then fail because of
289+
// non-retryable error.
290+
env.OnActivity(a.GenerateReplicationTasks, mock.Anything, mock.Anything).Return(
291+
temporal.NewNonRetryableApplicationError("mock generate replication tasks error", "", nil),
292+
).Times(1)
293+
294+
env.RegisterWorkflow(ForceTaskQueueUserDataReplicationWorkflow)
295+
env.OnActivity(a.SeedReplicationQueueWithUserDataEntries, mock.Anything, mock.Anything).Return(nil)
296+
297+
env.ExecuteWorkflow(ForceReplicationWorkflow, ForceReplicationParams{
298+
Namespace: "test-ns",
299+
Query: "",
300+
ConcurrentActivityCount: 1,
301+
OverallRps: 10,
302+
ListWorkflowsPageSize: 1,
303+
PageCountPerExecution: 4,
304+
})
305+
306+
require.True(t, env.IsWorkflowCompleted())
307+
err := env.GetWorkflowError()
308+
require.Error(t, err)
309+
require.Contains(t, err.Error(), "mock generate replication tasks error")
310+
env.AssertExpectations(t)
311+
}
312+
261313
func TestForceReplicationWorkflow_TaskQueueReplicationFailure(t *testing.T) {
262314
testSuite := &testsuite.WorkflowTestSuite{}
263315
env := testSuite.NewTestWorkflowEnvironment()

0 commit comments

Comments
 (0)