Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,8 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestStats() {
}

func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS() {
oracles.EnableTSValidation.Store(true)
defer oracles.EnableTSValidation.Store(false)
o, err := oracles.NewPdOracle(s.pdCli, &oracles.PDOracleOptions{
UpdateInterval: time.Second * 2,
})
Expand Down Expand Up @@ -962,8 +964,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS()
testImpl(getTS, true, nil)
testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, false, nil)
testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, true, nil)
// check is skipped for normal read
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, false, nil)
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, false, oracle.ErrFutureTSRead{})
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, true, oracle.ErrFutureTSRead{})
testImpl(func() uint64 { return math.MaxUint64 }, false, nil)
testImpl(func() uint64 { return math.MaxUint64 }, true, oracle.ErrLatestStaleRead{})
Expand Down
12 changes: 6 additions & 6 deletions oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const slowDist = 30 * time.Millisecond

type adaptiveUpdateTSIntervalState int

var EnableTSValidation atomic.Bool

const (
adaptiveUpdateTSIntervalStateNone adaptiveUpdateTSIntervalState = iota
// adaptiveUpdateTSIntervalStateNormal represents the state that the adaptive update ts interval is synced with the
Expand Down Expand Up @@ -667,17 +669,15 @@ func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Op
type ValidateReadTSForTidbSnapshot struct{}

func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *oracle.Option) error {
if !EnableTSValidation.Load() {
return nil
}

// For a mistake we've seen
if readTS >= math.MaxInt64 && readTS < math.MaxUint64 {
return errors.Errorf("MaxInt64 <= readTS < MaxUint64, readTS=%v", readTS)
}

// only check stale reads and reads using `tidb_snapshot`
forTidbSnapshot := ctx.Value(ValidateReadTSForTidbSnapshot{}) != nil
if !forTidbSnapshot && !isStaleRead {
return nil
}

if readTS == math.MaxUint64 {
if isStaleRead {
return oracle.ErrLatestStaleRead{}
Expand Down
13 changes: 9 additions & 4 deletions oracle/oracles/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ func TestAdaptiveUpdateTSInterval(t *testing.T) {
}

func TestValidateReadTS(t *testing.T) {
EnableTSValidation.Store(true)
defer EnableTSValidation.Store(false)
testImpl := func(staleRead bool) {
pdClient := MockPdClient{}
o, err := NewPdOracle(&pdClient, &PDOracleOptions{
Expand Down Expand Up @@ -429,6 +431,8 @@ func (c *MockPDClientWithPause) WithCallerComponent(component caller.Component)
}

func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) {
EnableTSValidation.Store(true)
defer EnableTSValidation.Store(false)
util.EnableFailpoints()

pdClient := &MockPDClientWithPause{}
Expand Down Expand Up @@ -536,6 +540,8 @@ func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) {
}

func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) {
EnableTSValidation.Store(true)
defer EnableTSValidation.Store(false)
oracleInterface, err := NewPdOracle(&MockPdClient{}, &PDOracleOptions{
UpdateInterval: time.Second * 2,
NoUpdateTS: true,
Expand Down Expand Up @@ -573,7 +579,7 @@ func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) {
// It loads `ts + 3` and `ts + 4` from the mock PD, and the check cannot pass.
// Updated: 2025-03-12, the non-stale read check is temporarily skipped.
err = o.ValidateReadTS(ctx, ts+5, false, opt)
assert.NoError(t, err)
assert.Error(t, err)
mustNoNotify()

// Do the check again. It loads `ts + 5` from the mock PD, and the check passes.
Expand Down Expand Up @@ -618,9 +624,8 @@ func TestSetLastTSAlwaysPushTS(t *testing.T) {
}

func TestValidateReadTSFromDifferentSource(t *testing.T) {
// Updated: 2025-03-12, the non-stale read check is temporarily skipped.
t.Skip()

EnableTSValidation.Store(true)
defer EnableTSValidation.Store(false)
// If a ts is fetched from a different client to the same cluster, the ts might not be cached by the low resolution
// ts. In this case, the validation should not be false positive.
util.EnableFailpoints()
Expand Down
Loading