Skip to content

Commit e660403

Browse files
authored
Handle workflow not found in replication (#2633)
* Handle workflow not found in replication
1 parent 6b8986c commit e660403

File tree

8 files changed

+215
-25
lines changed

8 files changed

+215
-25
lines changed

service/history/historyEngine.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func NewEngineWithShardContext(
220220
)
221221

222222
historyEngImpl.workflowTaskHandler = newWorkflowTaskHandlerCallback(historyEngImpl)
223-
historyEngImpl.replicationDLQHandler = newLazyReplicationDLQHandler(shard)
223+
historyEngImpl.replicationDLQHandler = newLazyReplicationDLQHandler(shard, workflowDeleteManager, historyCache)
224224

225225
return historyEngImpl
226226
}
@@ -438,6 +438,8 @@ func (e *historyEngineImpl) handleClusterMetadataUpdate(
438438
e.shard.GetNamespaceRegistry(),
439439
nDCHistoryResender,
440440
e,
441+
e.workflowDeleteManager,
442+
e.historyCache,
441443
e.shard.GetMetricsClient(),
442444
e.shard.GetLogger(),
443445
)

service/history/replicationDLQHandler.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"go.temporal.io/server/common/persistence"
4848
"go.temporal.io/server/service/history/shard"
4949
"go.temporal.io/server/service/history/tasks"
50+
"go.temporal.io/server/service/history/workflow"
5051
)
5152

5253
var (
@@ -81,16 +82,29 @@ type (
8182
taskExecutorsLock sync.Mutex
8283
taskExecutors map[string]replicationTaskExecutor
8384
shard shard.Context
85+
deleteManager workflow.DeleteManager
86+
workflowCache workflow.Cache
8487
logger log.Logger
8588
}
8689
)
8790

88-
func newLazyReplicationDLQHandler(shard shard.Context) replicationDLQHandler {
89-
return newReplicationDLQHandler(shard, make(map[string]replicationTaskExecutor))
91+
func newLazyReplicationDLQHandler(
92+
shard shard.Context,
93+
deleteManager workflow.DeleteManager,
94+
workflowCache workflow.Cache,
95+
) replicationDLQHandler {
96+
return newReplicationDLQHandler(
97+
shard,
98+
deleteManager,
99+
workflowCache,
100+
make(map[string]replicationTaskExecutor),
101+
)
90102
}
91103

92104
func newReplicationDLQHandler(
93105
shard shard.Context,
106+
deleteManager workflow.DeleteManager,
107+
workflowCache workflow.Cache,
94108
taskExecutors map[string]replicationTaskExecutor,
95109
) replicationDLQHandler {
96110

@@ -99,6 +113,8 @@ func newReplicationDLQHandler(
99113
}
100114
return &replicationDLQHandlerImpl{
101115
shard: shard,
116+
deleteManager: deleteManager,
117+
workflowCache: workflowCache,
102118
taskExecutors: taskExecutors,
103119
logger: shard.GetLogger(),
104120
}
@@ -324,6 +340,8 @@ func (r *replicationDLQHandlerImpl) getOrCreateTaskExecutor(clusterName string)
324340
r.shard.GetNamespaceRegistry(),
325341
resender,
326342
engine,
343+
r.deleteManager,
344+
r.workflowCache,
327345
r.shard.GetMetricsClient(),
328346
r.shard.GetLogger(),
329347
)

service/history/replicationDLQHandler_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"go.temporal.io/server/service/history/shard"
5050
"go.temporal.io/server/service/history/tasks"
5151
"go.temporal.io/server/service/history/tests"
52+
"go.temporal.io/server/service/history/workflow"
5253
)
5354

5455
type (
@@ -117,6 +118,8 @@ func (s *replicationDLQHandlerSuite) SetupTest() {
117118

118119
s.replicationMessageHandler = newReplicationDLQHandler(
119120
s.mockShard,
121+
workflow.NewMockDeleteManager(s.controller),
122+
workflow.NewMockCache(s.controller),
120123
s.taskExecutors,
121124
).(*replicationDLQHandlerImpl)
122125
}

service/history/replicationTaskExecutor.go

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"context"
3131

3232
commonpb "go.temporal.io/api/common/v1"
33+
"go.temporal.io/api/serviceerror"
3334

3435
enumsspb "go.temporal.io/server/api/enums/v1"
3536
"go.temporal.io/server/api/historyservice/v1"
@@ -41,6 +42,7 @@ import (
4142
serviceerrors "go.temporal.io/server/common/serviceerror"
4243
"go.temporal.io/server/common/xdc"
4344
"go.temporal.io/server/service/history/shard"
45+
"go.temporal.io/server/service/history/workflow"
4446
)
4547

4648
type (
@@ -54,9 +56,10 @@ type (
5456
namespaceRegistry namespace.Registry
5557
nDCHistoryResender xdc.NDCHistoryResender
5658
historyEngine shard.Engine
57-
58-
metricsClient metrics.Client
59-
logger log.Logger
59+
deleteManager workflow.DeleteManager
60+
workflowCache workflow.Cache
61+
metricsClient metrics.Client
62+
logger log.Logger
6063
}
6164
)
6265

@@ -67,6 +70,8 @@ func newReplicationTaskExecutor(
6770
namespaceRegistry namespace.Registry,
6871
nDCHistoryResender xdc.NDCHistoryResender,
6972
historyEngine shard.Engine,
73+
deleteManager workflow.DeleteManager,
74+
workflowCache workflow.Cache,
7075
metricsClient metrics.Client,
7176
logger log.Logger,
7277
) replicationTaskExecutor {
@@ -76,6 +81,8 @@ func newReplicationTaskExecutor(
7681
namespaceRegistry: namespaceRegistry,
7782
nDCHistoryResender: nDCHistoryResender,
7883
historyEngine: historyEngine,
84+
deleteManager: deleteManager,
85+
workflowCache: workflowCache,
7986
metricsClient: metricsClient,
8087
logger: logger,
8188
}
@@ -153,17 +160,23 @@ func (e *replicationTaskExecutorImpl) handleActivityTask(
153160
stopwatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByActivityReplicationScope, metrics.ClientLatency)
154161
defer stopwatch.Stop()
155162

156-
if resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
163+
resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
157164
namespace.ID(retryErr.NamespaceId),
158165
retryErr.WorkflowId,
159166
retryErr.RunId,
160167
retryErr.StartEventId,
161168
retryErr.StartEventVersion,
162169
retryErr.EndEventId,
163170
retryErr.EndEventVersion,
164-
); resendErr != nil {
165-
e.logger.Error("error resend history for sync activity", tag.Error(resendErr))
166-
// should return the replication error, not the resending error
171+
)
172+
switch resendErr.(type) {
173+
case *serviceerror.NotFound:
174+
// workflow is not found in source cluster, cleanup workflow in target cluster
175+
return e.cleanupWorkflowExecution(ctx, retryErr.NamespaceId, retryErr.WorkflowId, retryErr.RunId)
176+
case nil:
177+
//no-op
178+
default:
179+
e.logger.Error("error resend history for history event", tag.Error(resendErr))
167180
return err
168181
}
169182
return e.historyEngine.SyncActivity(ctx, request)
@@ -211,17 +224,23 @@ func (e *replicationTaskExecutorImpl) handleHistoryReplicationTask(
211224
resendStopWatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByHistoryReplicationScope, metrics.ClientLatency)
212225
defer resendStopWatch.Stop()
213226

214-
if resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
227+
resendErr := e.nDCHistoryResender.SendSingleWorkflowHistory(
215228
namespace.ID(retryErr.NamespaceId),
216229
retryErr.WorkflowId,
217230
retryErr.RunId,
218231
retryErr.StartEventId,
219232
retryErr.StartEventVersion,
220233
retryErr.EndEventId,
221234
retryErr.EndEventVersion,
222-
); resendErr != nil {
235+
)
236+
switch resendErr.(type) {
237+
case *serviceerror.NotFound:
238+
// workflow is not found in source cluster, cleanup workflow in target cluster
239+
return e.cleanupWorkflowExecution(ctx, retryErr.NamespaceId, retryErr.WorkflowId, retryErr.RunId)
240+
case nil:
241+
//no-op
242+
default:
223243
e.logger.Error("error resend history for history event", tag.Error(resendErr))
224-
// should return the replication error, not the resending error
225244
return err
226245
}
227246

@@ -256,3 +275,32 @@ FilterLoop:
256275
}
257276
return shouldProcessTask, nil
258277
}
278+
279+
func (e *replicationTaskExecutorImpl) cleanupWorkflowExecution(ctx context.Context, namespaceID string, workflowID string, runID string) (retErr error) {
280+
nsID := namespace.ID(namespaceID)
281+
ex := commonpb.WorkflowExecution{
282+
WorkflowId: workflowID,
283+
RunId: runID,
284+
}
285+
wfCtx, releaseFn, err := e.workflowCache.GetOrCreateWorkflowExecution(ctx, nsID, ex, workflow.CallerTypeTask)
286+
if err != nil {
287+
return err
288+
}
289+
defer func() { releaseFn(retErr) }()
290+
mutableState, err := wfCtx.LoadWorkflowExecution(ctx)
291+
if err != nil {
292+
return err
293+
}
294+
lastWriteVersion, err := mutableState.GetLastWriteVersion()
295+
if err != nil {
296+
return err
297+
}
298+
return e.deleteManager.DeleteWorkflowExecutionByReplication(
299+
ctx,
300+
nsID,
301+
ex,
302+
wfCtx,
303+
mutableState,
304+
lastWriteVersion,
305+
)
306+
}

service/history/replicationTaskExecutor_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"go.temporal.io/server/service/history/configs"
5353
"go.temporal.io/server/service/history/shard"
5454
"go.temporal.io/server/service/history/tests"
55+
"go.temporal.io/server/service/history/workflow"
5556
)
5657

5758
type (
@@ -125,6 +126,8 @@ func (s *replicationTaskExecutorSuite) SetupTest() {
125126
s.mockNamespaceCache,
126127
s.nDCHistoryResender,
127128
s.mockEngine,
129+
workflow.NewMockDeleteManager(s.controller),
130+
workflow.NewMockCache(s.controller),
128131
metricsClient,
129132
s.mockShard.GetLogger(),
130133
).(*replicationTaskExecutorImpl)

service/history/workflow/delete_manager.go

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@ package workflow
2828

2929
import (
3030
"context"
31+
"time"
3132

3233
commonpb "go.temporal.io/api/common/v1"
3334
enumspb "go.temporal.io/api/enums/v1"
35+
"go.temporal.io/api/serviceerror"
36+
enumsspb "go.temporal.io/server/api/enums/v1"
3437

3538
"go.temporal.io/server/common"
3639
"go.temporal.io/server/common/clock"
@@ -51,6 +54,7 @@ type (
5154
AddDeleteWorkflowExecutionTask(ctx context.Context, nsID namespace.ID, we commonpb.WorkflowExecution, ms MutableState) error
5255
DeleteWorkflowExecution(ctx context.Context, nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
5356
DeleteWorkflowExecutionByRetention(ctx context.Context, nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
57+
DeleteWorkflowExecutionByReplication(ctx context.Context, nsID namespace.ID, we commonpb.WorkflowExecution, weCtx Context, ms MutableState, sourceTaskVersion int64) error
5458
}
5559

5660
DeleteManagerImpl struct {
@@ -136,19 +140,23 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution(
136140
// workflow should not be deleted. NotFound errors are ignored by task processor.
137141
return consts.ErrWorkflowNotCompleted
138142
}
143+
completionEvent, err := ms.GetCompletionEvent(ctx)
144+
if err != nil {
145+
return err
146+
}
139147

140-
err := m.deleteWorkflowExecutionInternal(
148+
return m.deleteWorkflowExecutionInternal(
141149
ctx,
142150
nsID,
143151
we,
144152
weCtx,
145153
ms,
146154
sourceTaskVersion,
147155
false,
156+
nil,
157+
completionEvent.GetEventTime(),
148158
m.metricsClient.Scope(metrics.HistoryDeleteWorkflowExecutionScope),
149159
)
150-
151-
return err
152160
}
153161

154162
func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
@@ -166,19 +174,62 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
166174
// But cross DC replication can resurrect workflow and therefore DeleteHistoryEventTask should be ignored.
167175
return nil
168176
}
177+
completionEvent, err := ms.GetCompletionEvent(ctx)
178+
if err != nil {
179+
return err
180+
}
169181

170-
err := m.deleteWorkflowExecutionInternal(
182+
return m.deleteWorkflowExecutionInternal(
171183
ctx,
172184
nsID,
173185
we,
174186
weCtx,
175187
ms,
176188
sourceTaskVersion,
177189
true,
190+
nil,
191+
completionEvent.GetEventTime(),
178192
m.metricsClient.Scope(metrics.HistoryProcessDeleteHistoryEventScope),
179193
)
194+
}
180195

181-
return err
196+
func (m *DeleteManagerImpl) DeleteWorkflowExecutionByReplication(
197+
ctx context.Context,
198+
nsID namespace.ID,
199+
we commonpb.WorkflowExecution,
200+
weCtx Context,
201+
ms MutableState,
202+
sourceTaskVersion int64,
203+
) error {
204+
205+
namespaceEntry := ms.GetNamespaceEntry()
206+
if namespaceEntry.ActiveClusterName() == m.shard.GetClusterMetadata().GetCurrentClusterName() {
207+
return serviceerror.NewInvalidArgument("Cannot cleanup workflows in active cluster by replication")
208+
}
209+
210+
var startTime *time.Time
211+
var closedTime *time.Time
212+
if ms.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
213+
completionEvent, err := ms.GetCompletionEvent(ctx)
214+
if err != nil {
215+
return err
216+
}
217+
closedTime = completionEvent.GetEventTime()
218+
} else {
219+
startTime = ms.GetExecutionInfo().GetStartTime()
220+
}
221+
return m.deleteWorkflowExecutionInternal(
222+
ctx,
223+
nsID,
224+
we,
225+
weCtx,
226+
ms,
227+
sourceTaskVersion,
228+
false,
229+
startTime,
230+
closedTime,
231+
m.metricsClient.Scope(metrics.HistoryDeleteWorkflowExecutionScope),
232+
)
182233
}
183234

184235
func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
@@ -189,6 +240,8 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
189240
ms MutableState,
190241
newTaskVersion int64,
191242
archiveIfEnabled bool,
243+
startTime *time.Time,
244+
closedTime *time.Time,
192245
scope metrics.Scope,
193246
) error {
194247

@@ -210,11 +263,6 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
210263
currentBranchToken = nil
211264
}
212265

213-
completionEvent, err := ms.GetCompletionEvent(ctx)
214-
if err != nil {
215-
return err
216-
}
217-
218266
if err := m.shard.DeleteWorkflowExecution(
219267
ctx,
220268
definition.WorkflowKey{
@@ -224,8 +272,8 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
224272
},
225273
currentBranchToken,
226274
newTaskVersion,
227-
nil,
228-
completionEvent.GetEventTime(),
275+
startTime,
276+
closedTime,
229277
); err != nil {
230278
return err
231279
}

0 commit comments

Comments
 (0)