Skip to content

Commit 32977ed

Browse files
authored
De-duplicate reapply event (#2629)
* Dedup reapply event * rename function
1 parent 5f77615 commit 32977ed

File tree

1 file changed

+31
-2
lines changed

1 file changed

+31
-2
lines changed

service/history/historyEngine.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"go.temporal.io/server/service/history/tasks"
5151

5252
enumsspb "go.temporal.io/server/api/enums/v1"
53+
historyspb "go.temporal.io/server/api/history/v1"
5354
"go.temporal.io/server/api/historyservice/v1"
5455
"go.temporal.io/server/api/matchingservice/v1"
5556
persistencespb "go.temporal.io/server/api/persistence/v1"
@@ -78,7 +79,6 @@ import (
7879
"go.temporal.io/server/service/history/events"
7980
"go.temporal.io/server/service/history/shard"
8081
"go.temporal.io/server/service/history/workflow"
81-
8282
"go.temporal.io/server/service/worker/archiver"
8383
)
8484

@@ -3194,13 +3194,23 @@ func (e *historyEngineImpl) ReapplyEvents(
31943194
ctx,
31953195
namespaceID,
31963196
currentExecution,
3197-
func(context workflow.Context, mutableState workflow.MutableState) (*updateWorkflowAction, error) {
3197+
func(context workflow.Context, mutableState workflow.MutableState) (action *updateWorkflowAction, retErr error) {
31983198
// Filter out reapply event from the same cluster
31993199
toReapplyEvents := make([]*historypb.HistoryEvent, 0, len(reapplyEvents))
32003200
lastWriteVersion, err := mutableState.GetLastWriteVersion()
32013201
if err != nil {
32023202
return nil, err
32033203
}
3204+
sourceMutableState := mutableState
3205+
if sourceMutableState.GetWorkflowKey().RunID != runID {
3206+
originCtx, err := e.loadWorkflowOnce(ctx, namespaceID, workflowID, runID)
3207+
if err != nil {
3208+
return nil, err
3209+
}
3210+
defer func() { originCtx.getReleaseFn()(retErr) }()
3211+
sourceMutableState = originCtx.getMutableState()
3212+
}
3213+
32043214
for _, event := range reapplyEvents {
32053215
if event.GetVersion() == lastWriteVersion {
32063216
// The reapply is from the same cluster. Ignoring.
@@ -3211,6 +3221,11 @@ func (e *historyEngineImpl) ReapplyEvents(
32113221
// already apply the signal
32123222
continue
32133223
}
3224+
versionHistories := sourceMutableState.GetExecutionInfo().GetVersionHistories()
3225+
if e.containsHistoryEvent(versionHistories, event.GetEventId(), event.GetVersion()) {
3226+
continue
3227+
}
3228+
32143229
toReapplyEvents = append(toReapplyEvents, event)
32153230
}
32163231
if len(toReapplyEvents) == 0 {
@@ -3605,6 +3620,20 @@ func (e *historyEngineImpl) metricsScope(ctx context.Context) metrics.Scope {
36053620
return interceptor.MetricsScope(ctx, e.logger)
36063621
}
36073622

3623+
func (e *historyEngineImpl) containsHistoryEvent(
3624+
versionHistories *historyspb.VersionHistories,
3625+
reappliedEventID int64,
3626+
reappliedEventVersion int64,
3627+
) bool {
3628+
// Check if the source workflow contains the reapply event.
3629+
// If it does, it means the event is received in this cluster, no need to reapply.
3630+
_, err := versionhistory.FindFirstVersionHistoryIndexByVersionHistoryItem(
3631+
versionHistories,
3632+
versionhistory.NewVersionHistoryItem(reappliedEventID, reappliedEventVersion),
3633+
)
3634+
return err == nil
3635+
}
3636+
36083637
func getMetadataChangeCallbackID(componentName string, shardId int32) string {
36093638
return fmt.Sprintf("%s-%d", componentName, shardId)
36103639
}

0 commit comments

Comments
 (0)