Skip to content

Commit 2bc0e83

Browse files
committed
Add Event verification when backfill history
1 parent cd00f83 commit 2bc0e83

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed

service/history/ndc/workflow_state_replicator.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,22 +1456,29 @@ func (r *WorkflowStateReplicatorImpl) backfillHistory(
14561456
sortedAncestors := sortAncestors(historyBranch.GetAncestors())
14571457
sortedAncestorsIdx := 0
14581458
var ancestors []*persistencespb.HistoryBranchRange
1459+
var expectedEventID int64
1460+
if isStateBased {
1461+
expectedEventID = common.FirstEventID
1462+
}
14591463

14601464
BackfillLoop:
14611465
for remoteHistoryIterator.HasNext() {
14621466
historyBlob, err := remoteHistoryIterator.Next()
14631467
if err != nil {
14641468
return err
14651469
}
1466-
14671470
if isStateBased {
1468-
// If backfill suceeds but later event reapply fails, during task's next retry,
1469-
// we still need to reapply events that have been stored in local DB.
14701471
events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory)
14711472
if err != nil {
14721473
return err
14731474
}
14741475
for _, event := range events {
1476+
if expectedEventID != event.GetEventId() {
1477+
return serviceerror.NewInternal(fmt.Sprintf("Event not match. Expected %v, but got %v", expectedEventID, event.GetEventId()))
1478+
}
1479+
expectedEventID = event.GetEventId() + 1
1480+
// If backfill suceeds but later event reapply fails, during task's next retry,
1481+
// we still need to reapply events that have been stored in local DB.
14751482
mutableState.AddReapplyCandidateEvent(event)
14761483
r.addEventToCache(mutableState.GetWorkflowKey(), event)
14771484
}
@@ -1552,6 +1559,10 @@ BackfillLoop:
15521559
prevBranchID = branchID
15531560
}
15541561

1562+
if isStateBased && expectedEventID != lastEventItem.EventId+1 {
1563+
return serviceerror.NewInternal(fmt.Sprintf("Event not match. Expected %v, but got %v", expectedEventID, lastEventItem.EventId+1))
1564+
}
1565+
15551566
mutableState.GetExecutionInfo().LastFirstEventTxnId = prevTxnID
15561567
return mutableState.SetCurrentBranchToken(backfillBranchToken)
15571568
}

service/history/ndc/workflow_state_replicator_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,3 +1554,68 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_C
15541554
s.Equal(forkedBranchToken, localVersionHistoryies.Histories[2].BranchToken)
15551555
s.Equal(int32(2), localVersionHistoryies.CurrentVersionHistoryIndex)
15561556
}
1557+
1558+
func (s *workflowReplicatorSuite) Test_backfillHistory_EventIDMismatch() {
1559+
namespaceID := uuid.New()
1560+
branchInfo := &persistencespb.HistoryBranch{
1561+
TreeId: s.runID,
1562+
BranchId: uuid.New(),
1563+
Ancestors: nil,
1564+
}
1565+
historyBranch, err := serialization.HistoryBranchToBlob(branchInfo)
1566+
s.NoError(err)
1567+
1568+
nsEntry := namespace.NewNamespaceForTest(&persistencespb.NamespaceInfo{Name: "ns"}, nil, false, nil, 0)
1569+
s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(nsEntry, nil).AnyTimes()
1570+
1571+
mutableState := workflow.NewMutableState(
1572+
s.mockShard,
1573+
s.mockShard.GetEventsCache(),
1574+
s.logger,
1575+
nsEntry,
1576+
s.workflowID,
1577+
s.runID,
1578+
s.now,
1579+
)
1580+
mutableState.GetExecutionInfo().NamespaceId = namespaceID
1581+
mutableState.GetExecutionInfo().WorkflowId = s.workflowID
1582+
mutableState.GetExecutionInfo().VersionHistories = &historyspb.VersionHistories{
1583+
CurrentVersionHistoryIndex: 0,
1584+
Histories: []*historyspb.VersionHistory{
1585+
{
1586+
BranchToken: historyBranch.GetData(),
1587+
Items: []*historyspb.VersionHistoryItem{{EventId: 4, Version: 1}},
1588+
},
1589+
},
1590+
}
1591+
serializer := serialization.NewSerializer()
1592+
blob1, err := serializer.SerializeEvents([]*historypb.HistoryEvent{{EventId: 1, Version: 1}}, enumspb.ENCODING_TYPE_PROTO3)
1593+
s.NoError(err)
1594+
blob2, err := serializer.SerializeEvents([]*historypb.HistoryEvent{{EventId: 3, Version: 1}}, enumspb.ENCODING_TYPE_PROTO3)
1595+
s.NoError(err)
1596+
1597+
s.mockExecutionManager.EXPECT().ReadHistoryBranchByBatch(gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewNotFound("not found")).AnyTimes()
1598+
mockShard := historyi.NewMockShardContext(s.controller)
1599+
mockShard.EXPECT().GenerateTaskID().Return(int64(1), nil).AnyTimes()
1600+
mockShard.EXPECT().GetRemoteAdminClient("test-cluster").Return(s.mockRemoteAdminClient, nil).AnyTimes()
1601+
mockShard.EXPECT().GetShardID().Return(int32(0)).AnyTimes()
1602+
mockShard.EXPECT().GetExecutionManager().Return(s.mockExecutionManager).AnyTimes()
1603+
s.workflowStateReplicator.shardContext = mockShard
1604+
s.mockRemoteAdminClient.EXPECT().GetWorkflowExecutionRawHistoryV2(gomock.Any(), gomock.Any()).Return(&adminservice.GetWorkflowExecutionRawHistoryV2Response{
1605+
HistoryBatches: []*commonpb.DataBlob{blob1, blob2},
1606+
HistoryNodeIds: []int64{1, 3},
1607+
}, nil)
1608+
s.mockExecutionManager.EXPECT().AppendRawHistoryNodes(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
1609+
1610+
err = s.workflowStateReplicator.backfillHistory(
1611+
context.Background(),
1612+
"test-cluster",
1613+
namespace.ID(namespaceID),
1614+
s.workflowID,
1615+
s.runID,
1616+
mutableState,
1617+
true,
1618+
)
1619+
var internalErr *serviceerror.Internal
1620+
s.ErrorAs(err, &internalErr)
1621+
}

0 commit comments

Comments
 (0)