Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/tso/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (a *Allocator) IsInitialize() bool {
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (a *Allocator) UpdateTSO() (err error) {
for i := range maxUpdateTSORetryCount {
err = a.timestampOracle.updateTimestamp()
err = a.timestampOracle.updateTimestamp(a.timestampOracle.tsoMux.physical, true)
if err == nil {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/tso/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type tsoMetrics struct {
notLeaderAnymoreEvent prometheus.Counter
logicalOverflowEvent prometheus.Counter
exceededMaxRetryEvent prometheus.Counter
notAllowedSaveTimestampEvent prometheus.Counter
// timestampOracle operation duration
syncSaveDuration prometheus.Observer
resetSaveDuration prometheus.Observer
Expand Down Expand Up @@ -151,6 +152,7 @@ func newTSOMetrics(groupID string) *tsoMetrics {
notLeaderAnymoreEvent: tsoCounter.WithLabelValues("not_leader_anymore", groupID),
logicalOverflowEvent: tsoCounter.WithLabelValues("logical_overflow", groupID),
exceededMaxRetryEvent: tsoCounter.WithLabelValues("exceeded_max_retry", groupID),
notAllowedSaveTimestampEvent: tsoCounter.WithLabelValues("not_allowed_save_timestamp", groupID),
syncSaveDuration: tsoOpDuration.WithLabelValues("sync_save", groupID),
resetSaveDuration: tsoOpDuration.WithLabelValues("reset_save", groupID),
updateSaveDuration: tsoOpDuration.WithLabelValues("update_save", groupID),
Expand Down
37 changes: 27 additions & 10 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
// and trigger unnecessary warnings about clock offset.
// It's an empirical value.
jetLagWarningThreshold = 150 * time.Millisecond
waitUpdateTSOInterval = 50 * time.Millisecond
)

// tsoObject is used to store the current TSO in memory with a RWMutex lock.
Expand Down Expand Up @@ -90,13 +91,18 @@ func (t *timestampOracle) saveTimestamp(ts time.Time) error {
return t.storage.SaveTimestamp(ctx, t.keyspaceGroupID, ts, t.member.GetLeadership())
}

func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) {
// setTSOPhysical sets the TSO's physical part with the given time.
func (t *timestampOracle) setTSOPhysical(pre time.Time, next time.Time, force bool) {
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
// Do not update the zero physical time if the `force` flag is false.
if t.tsoMux.physical.Equal(typeutil.ZeroTime) && !force {
return
}
// If current physical time is bigger than the previous time, it indicates that the physical time has been updated.
if t.tsoMux.physical.After(pre) {
return
}
// make sure the ts won't fall back
if typeutil.SubTSOPhysicalByWallClock(next, t.tsoMux.physical) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between the two comparisons?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first comparison ensures that the t.tsoMux.physical doesn't change when getting Tso .
The second comparison ensures that the logical physical can't rollback

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused why they use different functions for comparison. Is it worth unifying the code style?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

t.tsoMux.physical = next
Expand All @@ -114,17 +120,18 @@ func (t *timestampOracle) getTSO() (time.Time, int64) {
}

// generateTSO will add the TSO's logical part with the given count and returns the new TSO result.
func (t *timestampOracle) generateTSO(ctx context.Context, count int64) (physical int64, logical int64) {
func (t *timestampOracle) generateTSO(ctx context.Context, count int64) (physical int64, logical int64, current time.Time) {
defer trace.StartRegion(ctx, "timestampOracle.generateTSO").End()
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
if t.tsoMux.physical.Equal(typeutil.ZeroTime) {
return 0, 0
return 0, 0, typeutil.ZeroTime
}
physical = t.tsoMux.physical.UnixNano() / int64(time.Millisecond)
t.tsoMux.logical += count
logical = t.tsoMux.logical
return physical, logical
current = t.tsoMux.physical
return physical, logical, current
}

func (t *timestampOracle) getLastSavedTime() time.Time {
Expand Down Expand Up @@ -201,7 +208,7 @@ func (t *timestampOracle) syncTimestamp() error {
zap.Time("last", last), zap.Time("last-saved", lastSavedTime),
zap.Time("save", save), zap.Time("next", next))
// save into memory
t.setTSOPhysical(next, true)
t.setTSOPhysical(t.tsoMux.physical, next, true)
return nil
}

Expand Down Expand Up @@ -274,6 +281,8 @@ func (t *timestampOracle) resetUserTimestamp(tso uint64, ignoreSmaller, skipUppe
}

// updateTimestamp is used to update the timestamp.
// pre is the previous physical time before update.
// allowSaveStorage indicates whether to allow save storage, only allocatorUpdater is true.
// This function will do two things:
// 1. When the logical time is going to be used up, increase the current physical time.
// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time
Expand All @@ -287,7 +296,7 @@ func (t *timestampOracle) resetUserTimestamp(tso uint64, ignoreSmaller, skipUppe
//
// NOTICE: this function should be called after the TSO in memory has been initialized
// and should not be called when the TSO in memory has been reset anymore.
func (t *timestampOracle) updateTimestamp() error {
func (t *timestampOracle) updateTimestamp(pre time.Time, allowSaveStorage bool) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can change pre to a boolean value. When needed, we can get the current TSO once at the beginning inside the function; otherwise, we would have to manually get and pass it in before every call, which seems a bit redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get the physical time in this function rather than in generateTso, the physical time may have been updated, triggering an update again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as updateTimestamp is not whole protected by the lock, then there is no difference to pass in a pre or get one at the very first beginning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, only update when met logical overflow

if !t.isInitialized() {
return errs.ErrUpdateTimestamp.FastGenByArgs("timestamp in memory has not been initialized")
}
Expand Down Expand Up @@ -340,6 +349,11 @@ func (t *timestampOracle) updateTimestamp() error {
// It is not safe to increase the physical time to `next`.
// The time window needs to be updated and saved to etcd.
if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), next) <= updateTimestampGuard {
// if need to save into etcd,
if !allowSaveStorage {
t.metrics.notAllowedSaveTimestampEvent.Inc()
return errs.ErrUpdateTimestamp.FastGenByArgs("update timestamp needs to save storage but it is not allowed")
}
save := next.Add(t.saveInterval)
start := time.Now()
if err := t.saveTimestamp(save); err != nil {
Expand All @@ -353,8 +367,7 @@ func (t *timestampOracle) updateTimestamp() error {
t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds())
}
// save into memory
t.setTSOPhysical(next, false)

t.setTSOPhysical(pre, next, false)
return nil
}

Expand All @@ -363,6 +376,7 @@ var maxRetryCount = 10
func (t *timestampOracle) getTS(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "timestampOracle.getTS").End()
var resp pdpb.Timestamp
var physical time.Time
if count == 0 {
return resp, errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive")
}
Expand All @@ -378,7 +392,7 @@ func (t *timestampOracle) getTS(ctx context.Context, count uint32) (pdpb.Timesta
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized")
}
// Get a new TSO result with the given count
resp.Physical, resp.Logical = t.generateTSO(ctx, int64(count))
resp.Physical, resp.Logical, physical = t.generateTSO(ctx, int64(count))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generateTSO returns physical, logical, current, and here they become resp.Physical, resp.Logical, physical. Please standardize the namings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good, add comment in the function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe

Suggested change
resp.Physical, resp.Logical, physical = t.generateTSO(ctx, int64(count))
resp.Physical, resp.Logical, current = t.generateTSO(ctx, int64(count))

if resp.GetPhysical() == 0 {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset")
}
Expand All @@ -388,7 +402,10 @@ func (t *timestampOracle) getTS(ctx context.Context, count uint32) (pdpb.Timesta
zap.Reflect("response", resp),
zap.Int("retry-count", i), errs.ZapError(errs.ErrLogicOverflow))
t.metrics.logicalOverflowEvent.Inc()
time.Sleep(t.updatePhysicalInterval)
if err := t.updateTimestamp(physical, false); err != nil && !errs.ErrUpdateTimestamp.Equal(err) {
log.Warn("update timestamp failed", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), zap.Error(err))
}
Comment on lines 445 to 450
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

log.Info should be log.Warn for a failed timestamp update.

This logs an error from updateTimestamp at Info level, but it represents an operational failure that warrants operator attention. Other failure paths in this file (e.g., line 390) consistently use log.Warn.

Proposed fix
 			if overflowed, err := t.updateTimestamp(overflowUpdate); err != nil {
-				log.Info("update timestamp failed", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), zap.Error(err))
+				log.Warn("update timestamp failed", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), zap.Error(err))
 				time.Sleep(t.updatePhysicalInterval)
🤖 Prompt for AI Agents
In `@pkg/tso/tso.go` around lines 441 - 446, The log call inside the error branch
of the updateTimestamp handling should be elevated from log.Info to log.Warn: in
the block that checks the result of t.updateTimestamp(overflowUpdate), replace
log.Info(...) with log.Warn(...), keeping the same structured fields (use
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID >
0) and zap.Error(err)) and preserve the existing sleep behavior using
t.updatePhysicalInterval; this aligns the error path for
updateTimestamp/overflowUpdate with other warnings in this file.

time.Sleep(waitUpdateTSOInterval)
continue
}
// In case lease expired after the first check.
Expand Down
125 changes: 125 additions & 0 deletions pkg/tso/tso_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2026 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/tikv/pd/pkg/election"
)

type MokElection struct{}

func (*MokElection) ID() uint64 { return 0 }
func (*MokElection) Name() string { return "" }
func (*MokElection) MemberValue() string { return "" }
func (*MokElection) Client() *clientv3.Client { return nil }
func (*MokElection) IsServing() bool { return true }
func (*MokElection) PromoteSelf() {}
func (*MokElection) Campaign(_ context.Context, _ int64) error {
return nil
}
func (*MokElection) Resign() {}
func (*MokElection) GetServingUrls() []string { return nil }
func (*MokElection) GetElectionPath() string { return "" }
func (*MokElection) GetLeadership() *election.Leadership { return nil }

func TestGenerateTSO(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
current := time.Now()
timestampOracle := &timestampOracle{
tsoMux: &tsoObject{
physical: current,
logical: maxLogical - 1,
},
saveInterval: 50 * time.Millisecond,
updatePhysicalInterval: 5 * time.Second,
maxResetTSGap: func() time.Duration { return time.Hour },
metrics: newTSOMetrics("test"),
member: &MokElection{},
}

// update physical time interval failed due to reach the lastSavedTime, it needs to save storage first, but this behavior is not allowed.
_, err := timestampOracle.getTS(ctx, 2)
re.Error(err)
re.Equal(current, timestampOracle.tsoMux.physical)

// simulate the save to storage operation is done.
timestampOracle.lastSavedTime.Store(current.Add(5 * time.Second))
_, err = timestampOracle.getTS(ctx, 2)
re.NoError(err)
re.NotEqual(current, timestampOracle.tsoMux.physical)
}

func TestCurrentGetTSO(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
current := time.Now()
timestampOracle := &timestampOracle{
tsoMux: &tsoObject{
physical: current,
logical: maxLogical - 1,
},
saveInterval: 50 * time.Millisecond,
updatePhysicalInterval: 5 * time.Second,
maxResetTSGap: func() time.Duration { return time.Hour },
metrics: newTSOMetrics("test"),
member: &MokElection{},
}

runDuration := 5 * time.Second
timestampOracle.lastSavedTime.Store(current.Add(runDuration))
runCtx, runCancel := context.WithTimeout(ctx, runDuration-time.Second)
defer runCancel()
wg := sync.WaitGroup{}
wg.Add(100)
changes := atomic.Int32{}
totalTso := atomic.Int32{}
for i := range 100 {
go func(i int) {
physical := timestampOracle.tsoMux.physical
defer wg.Done()
for {
select {
case <-runCtx.Done():
return
default:
_, err := timestampOracle.getTS(runCtx, 1)
totalTso.Add(1)
re.NoError(err)
if i == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually can use a struct like map[int64]struct{} to count all physical timestamps generated by all goroutines to check if it doesn't exceed the expected number of timestamp advances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I have collected all goroutines when the logical response time is equal to 1, which indicates the physical has been allocated.

if physical != timestampOracle.tsoMux.physical {
changes.Add(1)
physical = timestampOracle.tsoMux.physical
}
}
}
}
}(i)
}

wg.Wait()
re.Equal(totalTso.Load()/int32(maxLogical)+1, changes.Load())
}
Loading