Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 27 additions & 26 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/log"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
Expand All @@ -49,7 +50,8 @@ const (
randomRegionMaxRetry = 10
// ScanRegionLimit is the default limit for the number of regions to scan in a region scan request.
ScanRegionLimit = 1000
batchSearchSize = 16
// batchSearchSize is the default size for the number of IDs/keys/prevKeys to search in a batch.
batchSearchSize = 128
// CollectFactor is the factor to collect the count of region.
CollectFactor = 0.9
)
Expand Down Expand Up @@ -1542,10 +1544,6 @@ func (r *RegionsInfo) QueryRegions(
start = time.Now()
regions = r.getRegionsByKeys(keys)
queryRegionByKeysDuration.Observe(time.Since(start).Seconds())
// Assert the returned regions count matches the input keys.
if len(regions) != len(keys) {
panic("returned regions count mismatch with the input keys")
}
}

// Iterate the prevKeys to find the regions.
Expand All @@ -1554,10 +1552,6 @@ func (r *RegionsInfo) QueryRegions(
start = time.Now()
prevRegions = r.getRegionsByPrevKeys(prevKeys)
queryRegionByPrevKeysDuration.Observe(time.Since(start).Seconds())
// Assert the returned regions count matches the input keys.
if len(prevRegions) != len(prevKeys) {
panic("returned prev regions count mismatch with the input keys")
}
}

// Build the key -> ID map for the final results.
Expand All @@ -1569,13 +1563,20 @@ func (r *RegionsInfo) QueryRegions(
if len(ids) > 0 {
queryRegionIDsCount.Add(float64(len(ids)))
start = time.Now()
// Filter out IDs that have already been found or are invalid.
idsToQuery := make([]uint64, 0, len(ids))
for _, id := range ids {
// Check if the region has been found.
if regionFound, ok := regionsByID[id]; (ok && regionFound != nil) || id == 0 {
continue
}
// If the given region ID is not found in the region tree, set the region to nil.
if region := r.GetRegion(id); region == nil {
idsToQuery = append(idsToQuery, id)
}
// Batch query the regions by IDs to reduce lock contention.
regions := r.getRegionsByIDs(idsToQuery)
for i, id := range idsToQuery {
region := regions[i]
if region == nil {
regionsByID[id] = nil
} else {
regionResp := &pdpb.RegionResponse{
Expand All @@ -1601,7 +1602,7 @@ func (r *RegionsInfo) getRegionsByKeys(keys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(keys))
// Split the keys into multiple batches, and search each batch separately.
// This is to avoid the lock contention on the `regionTree`.
for _, batch := range splitKeysIntoBatches(keys) {
for _, batch := range slice.SplitIntoBatches(keys, batchSearchSize) {
r.t.RLock()
results := r.tree.searchByKeys(batch)
r.t.RUnlock()
Expand All @@ -1610,22 +1611,9 @@ func (r *RegionsInfo) getRegionsByKeys(keys [][]byte) []*RegionInfo {
return regions
}

func splitKeysIntoBatches(keys [][]byte) [][][]byte {
keysLen := len(keys)
batches := make([][][]byte, 0, (keysLen+batchSearchSize-1)/batchSearchSize)
for i := 0; i < keysLen; i += batchSearchSize {
end := i + batchSearchSize
if end > keysLen {
end = keysLen
}
batches = append(batches, keys[i:end])
}
return batches
}

func (r *RegionsInfo) getRegionsByPrevKeys(prevKeys [][]byte) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(prevKeys))
for _, batch := range splitKeysIntoBatches(prevKeys) {
for _, batch := range slice.SplitIntoBatches(prevKeys, batchSearchSize) {
r.t.RLock()
results := r.tree.searchByPrevKeys(batch)
r.t.RUnlock()
Expand All @@ -1634,6 +1622,19 @@ func (r *RegionsInfo) getRegionsByPrevKeys(prevKeys [][]byte) []*RegionInfo {
return regions
}

// getRegionsByIDs searches RegionInfo from regionMap by IDs in batch.
func (r *RegionsInfo) getRegionsByIDs(ids []uint64) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(ids))
for _, batch := range slice.SplitIntoBatches(ids, batchSearchSize) {
r.t.RLock()
for _, id := range batch {
regions = append(regions, r.getRegionLocked(id))
}
r.t.RUnlock()
}
return regions
}

// sortOutKeyIDMap will iterate the regions, convert it to a slice of regionID that corresponds to the input regions.
// It will also update `regionsByID` with the regionID and regionResponse.
func sortOutKeyIDMap(
Expand Down
77 changes: 77 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,3 +1428,80 @@ func TestResetRegionCache(t *testing.T) {
re.Equal(1, regions.GetTotalRegionCount())
re.NotNil(regions.GetRegion(4))
}

func BenchmarkQueryRegions(b *testing.B) {
regionSizes := []int{100, 1000, 10000, 100000}
querySizes := []int{10, 50, 100, 500, 1000}

for _, regionSize := range regionSizes {
regions := NewRegionsInfo()
var regionIDs []uint64
var keys [][]byte
var prevKeys [][]byte

for i := range regionSize {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
startKey := []byte(fmt.Sprintf("%20d", i*10))
endKey := []byte(fmt.Sprintf("%20d", (i+1)*10))
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: startKey,
EndKey: endKey,
}, peer)
regions.CheckAndPutRegion(region)
regionIDs = append(regionIDs, uint64(i+1))
keys = append(keys, startKey)
prevKeys = append(prevKeys, endKey)
}

for _, querySize := range querySizes {
if querySize > regionSize {
continue
}

queryIDs := make([]uint64, querySize)
queryKeys := make([][]byte, querySize)
queryPrevKeys := make([][]byte, querySize)
step := regionSize / querySize
for i := range querySize {
idx := i * step
queryIDs[i] = regionIDs[idx]
queryKeys[i] = keys[idx]
queryPrevKeys[i] = prevKeys[idx]
}

b.Run(fmt.Sprintf("QueryByKeys_regions=%d_queries=%d", regionSize, querySize), func(b *testing.B) {
b.ResetTimer()
for range b.N {
regions.QueryRegions(queryKeys, nil, nil, false)
}
})

b.Run(fmt.Sprintf("QueryByPrevKeys_regions=%d_queries=%d", regionSize, querySize), func(b *testing.B) {
b.ResetTimer()
for range b.N {
regions.QueryRegions(nil, queryPrevKeys, nil, false)
}
})

b.Run(fmt.Sprintf("QueryByIDs_regions=%d_queries=%d", regionSize, querySize), func(b *testing.B) {
b.ResetTimer()
for range b.N {
regions.QueryRegions(nil, nil, queryIDs, false)
}
})

b.Run(fmt.Sprintf("QueryMixed_regions=%d_queries=%d", regionSize, querySize), func(b *testing.B) {
halfSize := querySize / 2
mixedKeys := queryKeys[:halfSize]
mixedPrevKeys := queryPrevKeys[:halfSize]
mixedIDs := queryIDs[:halfSize]
b.ResetTimer()
for range b.N {
regions.QueryRegions(mixedKeys, mixedPrevKeys, mixedIDs, false)
}
})
}
}
}
17 changes: 17 additions & 0 deletions pkg/slice/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,20 @@ func EqualWithoutOrder[T comparable](a, b []T) bool {
}
return true
}

// SplitIntoBatches splits the slice of items of type T into batches with `batchSize` items per batch.
func SplitIntoBatches[T any](slice []T, batchSize int) [][]T {
if batchSize <= 0 {
return [][]T{slice}
}
sliceLen := len(slice)
batches := make([][]T, 0, (sliceLen+batchSize-1)/batchSize)
for i := 0; i < sliceLen; i += batchSize {
end := i + batchSize
if end > sliceLen {
end = sliceLen
}
batches = append(batches, slice[i:end])
}
return batches
}
12 changes: 12 additions & 0 deletions pkg/slice/slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,15 @@ func TestSliceEqualWithoutOrder(t *testing.T) {
re.False(slice.EqualWithoutOrder([]string{"a", "b"}, []string{"b"}))
re.False(slice.EqualWithoutOrder([]string{"a"}, []string{"b", "c"}))
}

func TestSliceSplitIntoBatches(t *testing.T) {
re := require.New(t)
re.Equal([][]int{{1, 2, 3, 4, 5}}, slice.SplitIntoBatches([]int{1, 2, 3, 4, 5}, 6))
re.Equal([][]int{{1, 2, 3, 4, 5}}, slice.SplitIntoBatches([]int{1, 2, 3, 4, 5}, 5))
re.Equal([][]int{{1, 2, 3, 4}, {5}}, slice.SplitIntoBatches([]int{1, 2, 3, 4, 5}, 4))
re.Equal([][]int{{1, 2, 3}, {4, 5}}, slice.SplitIntoBatches([]int{1, 2, 3, 4, 5}, 3))
re.Equal([][]int{{1, 2}, {3, 4}, {5}}, slice.SplitIntoBatches([]int{1, 2, 3, 4, 5}, 2))
re.Equal([][]int{{1}, {2}, {3}, {4}, {5}}, slice.SplitIntoBatches([]int{1, 2, 3, 4, 5}, 1))
re.Equal([][]int{{1, 2, 3, 4, 5}}, slice.SplitIntoBatches([]int{1, 2, 3, 4, 5}, 0))
re.Equal([][]int{{1, 2, 3, 4, 5}}, slice.SplitIntoBatches([]int{1, 2, 3, 4, 5}, -1))
}
Loading