Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0c0a5e8
partial commit
MyonKeminta Jul 10, 2025
afb4071
Replay changes to tests
MyonKeminta Jul 10, 2025
344589c
recover updates to the tests for adapted methods
MyonKeminta May 14, 2025
579bbb5
Code clean up and redirect implementations (partial)
MyonKeminta Jul 11, 2025
fc17b04
clean up old code
MyonKeminta Jul 14, 2025
21a4ad4
Provide compatibility for V2 APIs and add tests
MyonKeminta Jul 16, 2025
ee09992
Merge branch 'master' of https://github.com/tikv/pd into m/deprecate-…
MyonKeminta Jul 16, 2025
f006b80
Fix lint
MyonKeminta Jul 16, 2025
767c1fb
fix etcd key test
MyonKeminta Jul 16, 2025
2ab0a0d
fix etcd key test
MyonKeminta Jul 17, 2025
5bfbc69
fix etcd key test
MyonKeminta Jul 17, 2025
94dfafd
fix etcd key test
MyonKeminta Jul 17, 2025
704a400
Fix TestSafepoint of pd-ctl
MyonKeminta Jul 18, 2025
aca65d6
fix etcd key test
MyonKeminta Jul 18, 2025
6633d3a
Add warning logs for deprecated APIs
MyonKeminta Jul 18, 2025
9d93d75
Add warn log for GetAllGCSafePointV2
MyonKeminta Jul 18, 2025
2fdd875
Update commnets
MyonKeminta Jul 18, 2025
816cc3d
update cleint integration tests
MyonKeminta Jul 21, 2025
398a77c
Merge branch 'master' of https://github.com/tikv/pd into m/deprecate-…
MyonKeminta Jul 21, 2025
b00458e
Fix lint
MyonKeminta Jul 21, 2025
704350b
Address comments
MyonKeminta Jul 23, 2025
ba2936e
Merge branch 'master' of https://github.com/tikv/pd into m/deprecate-…
MyonKeminta Jul 24, 2025
aea4e65
Merge branch 'master' of https://github.com/tikv/pd into m/deprecate-…
MyonKeminta Jul 24, 2025
1e9e18d
fix check
MyonKeminta Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,18 @@ type RPCClient interface {
// UpdateGCSafePoint TiKV will check it and do GC themselves if necessary.
// If the given safePoint is less than the current one, it will not be updated.
// Returns the new safePoint after updating.
//
// Deprecated: This API is deprecated and replaced by AdvanceGCSafePoint, which expected only for use of the
// GCWorker of TiDB or any component that is responsible for managing and driving GC. For callers that want to
// read the current GC safe point, consider using GetGCStates instead.
UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error)
// UpdateServiceGCSafePoint updates the safepoint for specific service and
// returns the minimum safepoint across all services, this value is used to
// determine the safepoint for multiple services, it does not trigger a GC
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
//
// Deprecated: This API is deprecated and replaced by SetGCBarrier and DeleteGCBarrier.
UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error)
// Deprecated: Avoid using this API.
WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error)
// ScatterRegion scatters the specified region. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
// NOTICE: This method is the old version of ScatterRegions, you should use the later one as your first choice.
Expand Down Expand Up @@ -1033,6 +1037,10 @@ func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) (
}

// UpdateGCSafePoint implements the RPCClient interface.
//
// Deprecated: This API is deprecated and replaced by AdvanceGCSafePoint, which expected only for use of the
// GCWorker of TiDB or any component that is responsible for managing and driving GC. For callers that want to
// read the current GC safe point, consider using GetGCStates instead.
func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
if c.inner.keyspaceID != constants.NullKeyspaceID {
return c.updateGCSafePointV2(ctx, c.inner.keyspaceID, safePoint)
Expand Down Expand Up @@ -1067,6 +1075,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
// returns the minimum safepoint across all services, this value is used to
// determine the safepoint for multiple services, it does not trigger a GC
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
//
// Deprecated: This API is deprecated and replaced by SetGCBarrier and DeleteGCBarrier.
func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if c.inner.keyspaceID != constants.NullKeyspaceID {
return c.updateServiceSafePointV2(ctx, c.inner.keyspaceID, serviceID, ttl, safePoint)
Expand Down
81 changes: 71 additions & 10 deletions client/clients/gc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,41 @@ type Client interface {
//
//nolint:revive
type GCStatesClient interface {
// SetGCBarrier sets (creates or updates) a GC barrier.
// SetGCBarrier sets a GC barrier, which blocks GC from being advanced over the given barrierTS for at most a duration
// specified by ttl. This method either adds a new GC barrier or updates an existing one. Returns the information of the
// new GC barrier.
//
// A GC barrier is uniquely identified by the given barrierID in the keyspace scope for NullKeyspace or keyspaces
// with keyspace-level GC enabled. When this method is called on keyspaces without keyspace-level GC enabled, it will
// be equivalent to calling it on the NullKeyspace.
//
// Once a GC barrier is set, it will block the txn safe point from being advanced over the barrierTS, until the GC
// barrier is expired (defined by ttl) or manually deleted (by calling DeleteGCBarrier).
//
// When this method is called on an existing GC barrier, it updates the barrierTS and ttl of the existing GC barrier and
// the expiration time will become the current time plus the ttl. This means that calling this method on an existing
// GC barrier can extend its lifetime arbitrarily.
//
// Passing non-positive value to ttl is not allowed. Passing `time.Duration(math.MaxInt64)` to ttl indicates that the
// GC barrier should never expire. The ttl might be rounded up, and the actual ttl is guaranteed no less than the
// specified duration.
//
// The barrierID must be non-empty. "gc_worker" is a reserved name and cannot be used as a barrierID.
//
// The given barrierTS must be greater than or equal to the current txn safe point, or an error will be returned.
//
// When this function executes successfully, its result is never nil.
SetGCBarrier(ctx context.Context, barrierID string, barrierTS uint64, ttl time.Duration) (*GCBarrierInfo, error)
// DeleteGCBarrier deletes a GC barrier.
// DeleteGCBarrier deletes a GC barrier by the given barrierID. Returns the information of the deleted GC barrier, or
// nil if the barrier does not exist.
//
// When this method is called on a keyspace without keyspace-level GC enabled, it will be equivalent to calling it on
// the NullKeyspace.
DeleteGCBarrier(ctx context.Context, barrierID string) (*GCBarrierInfo, error)
// GetGCState gets the current GC state.
// GetGCState returns the GC state of the given keyspace.
//
// When this method is called on a keyspace without keyspace-level GC enabled, it will be equivalent to calling it on
// the NullKeyspace.
GetGCState(ctx context.Context) (GCState, error)
}

Expand All @@ -51,24 +81,51 @@ type GCStatesClient interface {
// WARNING: This is only for internal use. The only possible place to use this is the `GCWorker` in TiDB, or
// other possible components that are responsible for being the center of controlling GC of the cluster.
type InternalController interface {
// AdvanceTxnSafePoint tries to advance the transaction safe point to the target value.
// AdvanceTxnSafePoint tries to advance the txn safe point to the given target.
//
// Returns a struct AdvanceTxnSafePointResult, which contains the old txn safe point, the target, and the new
// txn safe point it finally made it to advance to. If there's something blocking the txn safe point from being
// advanced to the given target, it may finally be advanced to a smaller value or remains the previous value, in which
// case the BlockerDescription field of the AdvanceTxnSafePointResult will be set to a non-empty string describing
// the reason.
//
// Txn safe point of a single keyspace should never decrease. If the given target is smaller than the previous value,
// it returns an error.
//
// WARNING: This method is only used to manage the GC procedure, and should never be called by code that doesn't
// have the responsibility to manage GC. It can only be called on NullKeyspace or keyspaces with keyspace level GC
// enabled.
AdvanceTxnSafePoint(ctx context.Context, target uint64) (AdvanceTxnSafePointResult, error)
// AdvanceGCSafePoint tries to advance the GC safe point to the target value.
// AdvanceGCSafePoint tries to advance the GC safe point to the given target. If the target is less than the current
// value or greater than the txn safe point, it returns an error.
//
// WARNING: This method is only used to manage the GC procedure, and should never be called by code that doesn't
// have the responsibility to manage GC. It can only be called on NullKeyspace or keyspaces with keyspace level GC
// enabled.
AdvanceGCSafePoint(ctx context.Context, target uint64) (AdvanceGCSafePointResult, error)
}

// AdvanceTxnSafePointResult represents the result of advancing transaction safe point.
type AdvanceTxnSafePointResult struct {
OldTxnSafePoint uint64
Target uint64
NewTxnSafePoint uint64
// The old txn safe point before the advancement operation.
OldTxnSafePoint uint64
// The target to which the current advancement operation tried to advance the txn safe point. It contains the
Target uint64
// same value as the `target` argument passed to the AdvanceTxnSafePoint method.
NewTxnSafePoint uint64
// When the txn safe point is blocked and is unable to be advanced to exactly the target, this field will contains
// a non-empty string describing the reason why it is blocked.
BlockerDescription string
}

// AdvanceGCSafePointResult represents the result of advancing GC safe point.
type AdvanceGCSafePointResult struct {
// The old GC safe point before the advancement operation.
OldGCSafePoint uint64
Target uint64
// The target to which the current advancement operation tried to advance the GC safe point. It contains the
// same value as the `target` argument passed to the AdvanceGCSafePoint method.
Target uint64
// The new GC safe point after the advancement operation.
NewGCSafePoint uint64
}

Expand Down Expand Up @@ -97,7 +154,10 @@ func NewGCBarrierInfo(barrierID string, barrierTS uint64, ttl time.Duration, get
}
}

// IsExpired checks whether the barrier is expired.
// IsExpired checks whether the barrier is expired by the local time. The check is done by checking the local time.
// Note that the result is unreliable in case there is significant time drift between the client and the PD server.
// As the TTL is round down when returning from the server, this method may give an expired result slightly earlier
// than it actually expires in PD server.
func (b *GCBarrierInfo) IsExpired() bool {
return b.isExpiredImpl(time.Now())
}
Expand All @@ -115,6 +175,7 @@ func (b *GCBarrierInfo) isExpiredImpl(now time.Time) bool {
//
//nolint:revive
type GCState struct {
// The ID of the keyspace this GC state belongs to.
KeyspaceID uint32
TxnSafePoint uint64
GCSafePoint uint64
Expand Down
54 changes: 6 additions & 48 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"time"

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"

"github.com/tikv/pd/client/clients/gc"
"github.com/tikv/pd/client/constants"
Expand All @@ -33,6 +31,7 @@ import (
)

// updateGCSafePointV2 update gc safe point for the given keyspace.
// Only used for handling `UpdateGCSafePoint` in keyspace context, which is a deprecated usage.
func (c *client) updateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
Expand All @@ -42,6 +41,7 @@ func (c *client) updateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf
defer func() { metrics.CmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
//nolint:staticcheck
req := &pdpb.UpdateGCSafePointV2Request{
Header: c.requestHeader(),
KeyspaceId: keyspaceID,
Expand All @@ -52,7 +52,7 @@ func (c *client) updateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateGCSafePointV2(ctx, req)
resp, err := protoClient.UpdateGCSafePointV2(ctx, req) //nolint:staticcheck
cancel()

if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
Expand All @@ -62,6 +62,7 @@ func (c *client) updateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf
}

// updateServiceSafePointV2 update service safe point for the given keyspace.
// Only used for handling `UpdateServiceGCSafePoint` in keyspace context, which is a deprecated usage.
func (c *client) updateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
Expand All @@ -71,6 +72,7 @@ func (c *client) updateServiceSafePointV2(ctx context.Context, keyspaceID uint32
defer func() { metrics.CmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
//nolint:staticcheck
req := &pdpb.UpdateServiceSafePointV2Request{
Header: c.requestHeader(),
KeyspaceId: keyspaceID,
Expand All @@ -83,58 +85,14 @@ func (c *client) updateServiceSafePointV2(ctx context.Context, keyspaceID uint32
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateServiceSafePointV2(ctx, req)
resp, err := protoClient.UpdateServiceSafePointV2(ctx, req) //nolint:staticcheck
cancel()
if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
}

// WatchGCSafePointV2 watch gc safe point change.
func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error) {
SafePointEventsChan := make(chan []*pdpb.SafePointEvent)
req := &pdpb.WatchGCSafePointV2Request{
Header: c.requestHeader(),
Revision: revision,
}

ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
stream, err := protoClient.WatchGCSafePointV2(ctx, req)
if err != nil {
close(SafePointEventsChan)
return nil, err
}
go func() {
defer func() {
close(SafePointEventsChan)
if r := recover(); r != nil {
log.Error("[pd] panic in gc client `WatchGCSafePointV2`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
default:
resp, err := stream.Recv()
if err != nil {
log.Error("watch gc safe point v2 error", errs.ZapError(errs.ErrClientWatchGCSafePointV2Stream, err))
return
}
SafePointEventsChan <- resp.GetEvents()
}
}
}()
return SafePointEventsChan, err
}

// gcInternalController is a stateless wrapper over the client and implements gc.InternalController interface.
type gcInternalController struct {
client *client
Expand Down
36 changes: 17 additions & 19 deletions pkg/gc/gc_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (m *GCStateManager) redirectKeyspace(keyspaceID uint32, isUserAPI bool) (ui
}

// CompatibleLoadGCSafePoint loads current GC safe point from storage for the legacy GC API `GetGCSafePoint`.
func (m *GCStateManager) CompatibleLoadGCSafePoint() (uint64, error) {
keyspaceID, err := m.redirectKeyspace(constant.NullKeyspaceID, false)
func (m *GCStateManager) CompatibleLoadGCSafePoint(keyspaceID uint32) (uint64, error) {
keyspaceID, err := m.redirectKeyspace(keyspaceID, false)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -180,11 +180,11 @@ func (m *GCStateManager) AdvanceGCSafePoint(keyspaceID uint32, target uint64) (o
// current value, it returns the current value without updating it.
// This is provided for compatibility purpose, making the existing uses of the deprecated API `UpdateGCSafePoint`
// still work.
func (m *GCStateManager) CompatibleUpdateGCSafePoint(target uint64) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) {
func (m *GCStateManager) CompatibleUpdateGCSafePoint(keyspaceID uint32, target uint64) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) {
m.mu.Lock()
defer m.mu.Unlock()

return m.advanceGCSafePointImpl(constant.NullKeyspaceID, target, true)
return m.advanceGCSafePointImpl(keyspaceID, target, true)
}

func (m *GCStateManager) advanceGCSafePointImpl(keyspaceID uint32, target uint64, compatible bool) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) {
Expand Down Expand Up @@ -305,7 +305,7 @@ func (m *GCStateManager) advanceTxnSafePointImpl(keyspaceID uint32, target uint6
}

for _, barrier := range barriers {
if keyspaceID == constant.NullKeyspaceID && barrier.BarrierID == keypath.GCWorkerServiceSafePointID {
if barrier.BarrierID == keypath.GCWorkerServiceSafePointID {
downgradeCompatibleMode = true
continue
}
Expand Down Expand Up @@ -436,7 +436,7 @@ func (*GCStateManager) logAdvancingTxnSafePoint(keyspaceID uint32, result Advanc
// GC barrier should never expire. The ttl might be rounded up, and the actual ttl is guaranteed no less than the
// specified duration.
//
// The barrierID must be non-empty. For NullKeyspace, "gc_worker" is a reserved name and cannot be used as a barrierID.
// The barrierID must be non-empty. "gc_worker" is a reserved name and cannot be used as a barrierID.
//
// The given barrierTS must be greater than or equal to the current txn safe point, or an error will be returned.
//
Expand All @@ -459,7 +459,7 @@ func (m *GCStateManager) SetGCBarrier(keyspaceID uint32, barrierID string, barri

func (m *GCStateManager) setGCBarrierImpl(keyspaceID uint32, barrierID string, barrierTS uint64, ttl time.Duration, now time.Time) (*endpoint.GCBarrier, error) {
// The barrier ID (or service ID of the service safe points) is reserved for keeping backward compatibility.
if keyspaceID == constant.NullKeyspaceID && barrierID == keypath.GCWorkerServiceSafePointID {
if barrierID == keypath.GCWorkerServiceSafePointID {
return nil, errs.ErrReservedGCBarrierID.GenWithStackByArgs(barrierID)
}
// Disallow empty barrierID
Expand Down Expand Up @@ -519,7 +519,7 @@ func (m *GCStateManager) DeleteGCBarrier(keyspaceID uint32, barrierID string) (*

func (m *GCStateManager) deleteGCBarrierImpl(keyspaceID uint32, barrierID string) (*endpoint.GCBarrier, error) {
// The barrier ID (or service ID of the service safe points) is reserved for keeping backward compatibility.
if keyspaceID == constant.NullKeyspaceID && barrierID == keypath.GCWorkerServiceSafePointID {
if barrierID == keypath.GCWorkerServiceSafePointID {
return nil, errs.ErrReservedGCBarrierID.GenWithStackByArgs(barrierID)
}
// Disallow empty barrierID
Expand Down Expand Up @@ -588,13 +588,11 @@ func (m *GCStateManager) getGCStateInTransaction(keyspaceID uint32, _ *endpoint.
return GCState{}, err
}

// For NullKeyspace, remove GC barrier whose barrierID is "gc_worker", which is only exists for providing
// compatibility with the old versions.
if keyspaceID == constant.NullKeyspaceID {
result.GCBarriers = slices.DeleteFunc(result.GCBarriers, func(b *endpoint.GCBarrier) bool {
return b.BarrierID == keypath.GCWorkerServiceSafePointID
})
}
// Remove GC barrier whose barrierID is "gc_worker", which is only exists for providing compatibility with the old
// versions.
result.GCBarriers = slices.DeleteFunc(result.GCBarriers, func(b *endpoint.GCBarrier) bool {
return b.BarrierID == keypath.GCWorkerServiceSafePointID
})

return result, nil
}
Expand Down Expand Up @@ -700,14 +698,14 @@ func (m *GCStateManager) GetAllKeyspacesGCStates() (map[uint32]GCState, error) {
// whether the `ttl` is positive or not. As the txn safe point is always less or equal to any GC barriers, we
// simulate the case that the service safe point of "gc_worker" is the minimal one, and return a service safe point
// with the service ID equals to "gc_worker".
//
// This function only works on the NullKeyspace.
func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(serviceID string, newServiceSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
keyspaceID := constant.NullKeyspaceID
func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(keyspaceID uint32, serviceID string, newServiceSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
keyspaceID, err = m.redirectKeyspace(keyspaceID, true)

m.mu.Lock()
defer m.mu.Unlock()

// TODO: After implementing the global GC barrier, redirect the invocation on "native_br" to `SetGlobalGCBarrier`.
// Note that the behavior is only for the null keyspace.
if serviceID == keypath.GCWorkerServiceSafePointID {
if ttl != math.MaxInt64 {
return nil, false, errors.New("TTL of gc_worker's service safe point must be infinity")
Expand Down
Loading
Loading