Skip to content
Merged
2 changes: 1 addition & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func IsWhitelistServiceTransientError(err error) bool {
// WorkflowIDToHistoryShard is used to map workflowID to a shardID
func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int {
hash := farm.Fingerprint32([]byte(workflowID))
return int(hash % uint32(numberOfShards))
return int(hash%uint32(numberOfShards)) + 1 // ShardID starts with 1
}

// PrettyPrintHistory prints history in human readable format
Expand Down
2 changes: 1 addition & 1 deletion host/ndc/nDC_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *nDCIntegrationTestSuite) GetReplicationMessagesMock(
}

return &adminservice.GetReplicationMessagesResponse{
MessagesByShard: map[int32]*replicationspb.ReplicationMessages{0: replicationMessage},
MessagesByShard: map[int32]*replicationspb.ReplicationMessages{1: replicationMessage},
}, nil
default:
return &adminservice.GetReplicationMessagesResponse{
Expand Down
2 changes: 1 addition & 1 deletion host/ndc/replication_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *nDCIntegrationTestSuite) TestReplicationMessageDLQ() {
)

execMgrFactory := s.active.GetExecutionManagerFactory()
executionManager, err := execMgrFactory.NewExecutionManager(0)
executionManager, err := execMgrFactory.NewExecutionManager(1)
s.NoError(err)

expectedDLQMsgs := map[int64]bool{}
Expand Down
8 changes: 3 additions & 5 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,11 +1413,9 @@ func (h *Handler) SyncShardStatus(ctx context.Context, request *historyservice.S
return nil, h.error(errSourceClusterNotSet, scope, "", "")
}

// TODO: Disabling this check as 0 is a valid ShardID. Correct long term fix is to have ShardID start from 1
// so we can enable this check to validate ShardID is not set.
// if request.GetShardId() == 0 {
// return nil, h.error(errShardIDNotSet, scope, "", "")
// }
if request.GetShardId() == 0 {
return nil, h.error(errShardIDNotSet, scope, "", "")
}

if timestamp.TimeValue(request.GetStatusTime()).IsZero() {
return nil, h.error(errTimestampNotSet, scope, "", "")
Expand Down
4 changes: 2 additions & 2 deletions service/history/queueAckMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *queueAckMgrSuite) SetupTest() {
s.controller,
&p.ShardInfoWithFailover{
ShardInfo: &persistenceblobs.ShardInfo{
ShardId: 0,
ShardId: 1,
RangeId: 1,
ClusterTimerAckLevel: map[string]*types.Timestamp{
cluster.TestCurrentClusterName: gogoProtoTimestampNowAddDuration(-8),
Expand Down Expand Up @@ -290,7 +290,7 @@ func (s *queueFailoverAckMgrSuite) SetupTest() {
s.controller,
&p.ShardInfoWithFailover{
ShardInfo: &persistenceblobs.ShardInfo{
ShardId: 0,
ShardId: 1,
RangeId: 1,
ClusterTimerAckLevel: map[string]*types.Timestamp{
cluster.TestCurrentClusterName: types.TimestampNow(),
Expand Down
7 changes: 6 additions & 1 deletion service/history/replicationTaskFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,12 @@ func (f *ReplicationTaskFetcherImpl) fetchAndDistributeTasks(requestByShard map[
f.logger.Debug("Successfully fetched replication tasks.", tag.Counter(len(messagesByShard)))

for shardID, tasks := range messagesByShard {
request := requestByShard[shardID]
request, ok := requestByShard[shardID]

if !ok {
f.logger.Error("No outstanding request found for shardId. Skipping Messages.", tag.ShardID(int(shardID)))
continue
}
request.respChan <- tasks
close(request.respChan)
delete(requestByShard, shardID)
Expand Down
14 changes: 7 additions & 7 deletions service/history/replicationTaskFetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ func (s *replicationTaskFetcherSuite) TearDownTest() {
func (s *replicationTaskFetcherSuite) TestGetMessages() {
requestByShard := make(map[int32]*request)
token := &replicationspb.ReplicationToken{
ShardId: 0,
ShardId: 1,
LastProcessedMessageId: 1,
LastRetrievedMessageId: 2,
}
requestByShard[0] = &request{
requestByShard[1] = &request{
token: token,
}
replicationMessageRequest := &adminservice.GetReplicationMessagesRequest{
Expand All @@ -104,7 +104,7 @@ func (s *replicationTaskFetcherSuite) TestGetMessages() {
ClusterName: "active",
}
messageByShared := make(map[int32]*replicationspb.ReplicationMessages)
messageByShared[0] = &replicationspb.ReplicationMessages{}
messageByShared[1] = &replicationspb.ReplicationMessages{}
expectedResponse := &adminservice.GetReplicationMessagesResponse{
MessagesByShard: messageByShared,
}
Expand All @@ -117,12 +117,12 @@ func (s *replicationTaskFetcherSuite) TestGetMessages() {
func (s *replicationTaskFetcherSuite) TestFetchAndDistributeTasks() {
requestByShard := make(map[int32]*request)
token := &replicationspb.ReplicationToken{
ShardId: 0,
ShardId: 1,
LastProcessedMessageId: 1,
LastRetrievedMessageId: 2,
}
respChan := make(chan *replicationspb.ReplicationMessages, 1)
requestByShard[0] = &request{
requestByShard[1] = &request{
token: token,
respChan: respChan,
}
Expand All @@ -133,13 +133,13 @@ func (s *replicationTaskFetcherSuite) TestFetchAndDistributeTasks() {
ClusterName: "active",
}
messageByShared := make(map[int32]*replicationspb.ReplicationMessages)
messageByShared[0] = &replicationspb.ReplicationMessages{}
messageByShared[1] = &replicationspb.ReplicationMessages{}
expectedResponse := &adminservice.GetReplicationMessagesResponse{
MessagesByShard: messageByShared,
}
s.frontendClient.EXPECT().GetReplicationMessages(gomock.Any(), replicationMessageRequest).Return(expectedResponse, nil)
err := s.replicationTaskFetcher.fetchAndDistributeTasks(requestByShard)
s.NoError(err)
respToken := <-respChan
s.Equal(messageByShared[0], respToken)
s.Equal(messageByShared[1], respToken)
}
2 changes: 1 addition & 1 deletion service/history/shardController.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (c *shardController) acquireShards() {
}()
}
// Submit tasks to the channel.
for shardID := 0; shardID < c.config.NumberOfShards; shardID++ {
for shardID := 1; shardID <= c.config.NumberOfShards; shardID++ {
shardActionCh <- shardID
if c.isShuttingDown() {
return
Expand Down
42 changes: 21 additions & 21 deletions service/history/shardController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *shardControllerSuite) TestAcquireShardSuccess() {
alternativeClusterTimerAck := gogoProtoTimestampNowAddDuration(-200)

myShards := []int{}
for shardID := int32(0); shardID < int32(numShards); shardID++ {
for shardID := int32(1); shardID <= int32(numShards); shardID++ {
hostID := shardID % 4
if hostID == 0 {
myShards = append(myShards, int(shardID))
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s *shardControllerSuite) TestAcquireShardSuccess() {
s.NotNil(s.shardController.getEngineForShard(shardID))
count++
}
s.Equal(3, count)
s.Equal(2, count)
}

func (s *shardControllerSuite) TestAcquireShardsConcurrently() {
Expand All @@ -193,7 +193,7 @@ func (s *shardControllerSuite) TestAcquireShardsConcurrently() {
alternativeClusterTimerAck := gogoProtoTimestampNowAddDuration(-200)

var myShards []int
for shardID := int32(0); shardID < int32(numShards); shardID++ {
for shardID := int32(1); shardID <= int32(numShards); shardID++ {
hostID := shardID % 4
if hostID == 0 {
myShards = append(myShards, int(shardID))
Expand Down Expand Up @@ -256,18 +256,18 @@ func (s *shardControllerSuite) TestAcquireShardsConcurrently() {
s.NotNil(s.shardController.getEngineForShard(shardID))
count++
}
s.Equal(3, count)
s.Equal(2, count)
}

func (s *shardControllerSuite) TestAcquireShardLookupFailure() {
numShards := 2
s.config.NumberOfShards = numShards
for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(nil, errors.New("ring failure")).Times(1)
}

s.shardController.acquireShards()
for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(nil, errors.New("ring failure")).Times(1)
s.Nil(s.shardController.getEngineForShard(shardID))
}
Expand All @@ -283,7 +283,7 @@ func (s *shardControllerSuite) TestAcquireShardRenewSuccess() {
currentClusterTimerAck := gogoProtoTimestampNowAddDuration(-100)
alternativeClusterTimerAck := gogoProtoTimestampNowAddDuration(-200)

for shardID := int32(0); shardID < int32(numShards); shardID++ {
for shardID := int32(1); shardID <= int32(numShards); shardID++ {
s.mockHistoryEngine.EXPECT().Start().Return().Times(1)
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(s.hostInfo, nil).Times(2)
s.mockEngineFactory.On("CreateEngine", mock.Anything).Return(s.mockHistoryEngine).Once()
Expand Down Expand Up @@ -335,12 +335,12 @@ func (s *shardControllerSuite) TestAcquireShardRenewSuccess() {
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestSingleDCClusterInfo).AnyTimes()
s.shardController.acquireShards()

for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(s.hostInfo, nil).Times(1)
}
s.shardController.acquireShards()

for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
s.NotNil(s.shardController.getEngineForShard(shardID))
}
}
Expand All @@ -355,7 +355,7 @@ func (s *shardControllerSuite) TestAcquireShardRenewLookupFailed() {
currentClusterTimerAck := gogoProtoTimestampNowAddDuration(-100)
alternativeClusterTimerAck := gogoProtoTimestampNowAddDuration(-200)

for shardID := int32(0); shardID < int32(numShards); shardID++ {
for shardID := int32(1); shardID <= int32(numShards); shardID++ {
s.mockHistoryEngine.EXPECT().Start().Return().Times(1)
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(s.hostInfo, nil).Times(2)
s.mockEngineFactory.On("CreateEngine", mock.Anything).Return(s.mockHistoryEngine).Once()
Expand Down Expand Up @@ -407,12 +407,12 @@ func (s *shardControllerSuite) TestAcquireShardRenewLookupFailed() {
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestSingleDCClusterInfo).AnyTimes()
s.shardController.acquireShards()

for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(nil, errors.New("ring failure")).Times(1)
}
s.shardController.acquireShards()

for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
s.NotNil(s.shardController.getEngineForShard(shardID))
}
}
Expand All @@ -422,7 +422,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() {
s.config.NumberOfShards = numShards
s.shardController = newShardController(s.mockResource, s.mockEngineFactory, s.config)
historyEngines := make(map[int]*MockEngine)
for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
mockEngine := NewMockEngine(s.controller)
historyEngines[shardID] = mockEngine
s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6)
Expand All @@ -439,7 +439,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() {
workerWG.Add(1)
go func() {
for attempt := 0; attempt < 10; attempt++ {
for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
engine, err := s.shardController.getEngineForShard(shardID)
s.Nil(err)
s.NotNil(engine)
Expand All @@ -452,7 +452,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() {
workerWG.Wait()

differentHostInfo := membership.NewHostInfo("another-host", nil)
for shardID := 0; shardID < 2; shardID++ {
for shardID := 1; shardID <= 2; shardID++ {
mockEngine := historyEngines[shardID]
mockEngine.EXPECT().Stop().Return().Times(1)
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(differentHostInfo, nil).AnyTimes()
Expand All @@ -463,7 +463,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() {
workerWG.Add(1)
go func() {
for attempt := 0; attempt < 10; attempt++ {
for shardID := 2; shardID < numShards; shardID++ {
for shardID := 3; shardID <= numShards; shardID++ {
engine, err := s.shardController.getEngineForShard(shardID)
s.Nil(err)
s.NotNil(engine)
Expand All @@ -479,7 +479,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() {
go func() {
shardLost := false
for attempt := 0; !shardLost && attempt < 10; attempt++ {
for shardID := 0; shardID < 2; shardID++ {
for shardID := 1; shardID <= 2; shardID++ {
_, err := s.shardController.getEngineForShard(shardID)
if err != nil {
s.logger.Error("ShardLost", tag.Error(err))
Expand All @@ -497,7 +497,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() {
workerWG.Wait()

s.mockServiceResolver.EXPECT().RemoveListener(shardControllerMembershipUpdateListenerName).Return(nil).AnyTimes()
for shardID := 2; shardID < numShards; shardID++ {
for shardID := 3; shardID <= numShards; shardID++ {
mockEngine := historyEngines[shardID]
mockEngine.EXPECT().Stop().Return().Times(1)
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(s.hostInfo, nil).AnyTimes()
Expand All @@ -510,7 +510,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() {
s.config.NumberOfShards = numShards
s.shardController = newShardController(s.mockResource, s.mockEngineFactory, s.config)
historyEngines := make(map[int]*MockEngine)
for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
mockEngine := NewMockEngine(s.controller)
historyEngines[shardID] = mockEngine
s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6)
Expand All @@ -528,7 +528,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() {
go func() {
shardLost := false
for attempt := 0; !shardLost && attempt < 10; attempt++ {
for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
_, err := s.shardController.getEngineForShard(shardID)
if err != nil {
s.logger.Error("ShardLost", tag.Error(err))
Expand All @@ -544,7 +544,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() {
}

s.mockServiceResolver.EXPECT().RemoveListener(shardControllerMembershipUpdateListenerName).Return(nil).AnyTimes()
for shardID := 0; shardID < numShards; shardID++ {
for shardID := 1; shardID <= numShards; shardID++ {
mockEngine := historyEngines[shardID]
mockEngine.EXPECT().Stop().Times(1)
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(s.hostInfo, nil).AnyTimes()
Expand Down
4 changes: 2 additions & 2 deletions service/history/timerQueueAckMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *timerQueueAckMgrSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistenceblobs.ShardInfo{
ShardId: 0,
ShardId: 1,
RangeId: 1,
ClusterTimerAckLevel: map[string]*types.Timestamp{
cluster.TestCurrentClusterName: gogoProtoTimestampNowAddDuration(-8),
Expand Down Expand Up @@ -557,7 +557,7 @@ func (s *timerQueueFailoverAckMgrSuite) SetupTest() {
s.controller,
&persistence.ShardInfoWithFailover{
ShardInfo: &persistenceblobs.ShardInfo{
ShardId: 0,
ShardId: 1,
RangeId: 1,
ClusterTimerAckLevel: map[string]*types.Timestamp{
cluster.TestCurrentClusterName: types.TimestampNow(),
Expand Down