Skip to content
Merged
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
c70bd8e
Add chagnes to the path definition
MyonKeminta Feb 27, 2025
84b09c4
Add gc_states.go and gc_states_test.go
MyonKeminta Feb 27, 2025
70c0ce9
fix test
MyonKeminta Feb 27, 2025
ba36b14
Add more comments
MyonKeminta Feb 27, 2025
26fc18c
fix lint
MyonKeminta Feb 28, 2025
2622255
Add more comments
MyonKeminta Feb 28, 2025
db9fa7b
Remove unnecessary commented code
MyonKeminta Feb 28, 2025
364cf6d
Add more comments
MyonKeminta Mar 3, 2025
c5653a2
Address comments
MyonKeminta Mar 3, 2025
2f0c663
Update file headers
MyonKeminta Mar 5, 2025
c1731dd
Renaming
MyonKeminta Mar 5, 2025
d9b944f
Merge path functions
MyonKeminta Mar 5, 2025
f1647bf
rename keypath/safe_point.go to gc_states.go
MyonKeminta Mar 5, 2025
b913e1e
Merge branch 'master' of https://github.com/tikv/pd into m/gc-state-s…
MyonKeminta Mar 5, 2025
11a002e
Add comments explaining keyspace-level GC and unified GC
MyonKeminta Mar 5, 2025
8e4bd77
Address comments
MyonKeminta Mar 6, 2025
0a5acfd
Address comments
MyonKeminta Mar 6, 2025
42cae00
fix lint
MyonKeminta Mar 6, 2025
632a99d
Add related files
MyonKeminta Mar 11, 2025
66d462b
update kvproto
MyonKeminta Mar 11, 2025
ed770a8
move code
MyonKeminta Mar 11, 2025
d96539b
switch implementations
MyonKeminta Mar 11, 2025
1c72c88
abandon GC API V2
MyonKeminta Mar 11, 2025
397a53f
Address comments
MyonKeminta Mar 12, 2025
6445fa4
Add test for GCBarrer.IsExpired
MyonKeminta Mar 12, 2025
106639f
Merge branch 'master' of https://github.com/tikv/pd into m/gc-state-s…
MyonKeminta Mar 12, 2025
236ed1c
Merge branch 'm/gc-state-storage' into m/gc-state-manager
MyonKeminta Mar 13, 2025
11080fd
Merge commit 'c1c681a' into m/gc-state-manager
MyonKeminta Mar 13, 2025
1530f53
fix lint
MyonKeminta Mar 13, 2025
8b9c144
address comments from #9109
MyonKeminta Mar 13, 2025
589b58f
fix lint
MyonKeminta Mar 13, 2025
75b7fda
Fix lint
MyonKeminta Mar 13, 2025
d36b277
Add test case for GetGCState & GetAllKeyspacesGCStates
MyonKeminta Mar 21, 2025
ed5afd2
fix errordoc
MyonKeminta Mar 21, 2025
f0e629d
Merge branch 'master' of https://github.com/tikv/pd into m/gc-state-m…
MyonKeminta Mar 21, 2025
ea49f7a
fix etcd key compatibility test
MyonKeminta Mar 21, 2025
8081080
Adjustments to the comments
MyonKeminta Mar 24, 2025
9595362
Add test coverage for non-positive TTL of SetGCBarrier and violation …
MyonKeminta Mar 24, 2025
84bfde9
remove some methods and test cases to reduce single PR size
MyonKeminta Mar 24, 2025
43b2068
Add back removed changes
MyonKeminta Mar 24, 2025
1981578
Fix test
MyonKeminta Mar 24, 2025
a1776b3
Address comments
MyonKeminta Mar 31, 2025
66cf8e9
Address comments
MyonKeminta Mar 31, 2025
051eb23
Remove saturatingDuration
MyonKeminta Mar 31, 2025
cc2c527
Rename keyspace GC management type configuration values
MyonKeminta Mar 31, 2025
4841da6
Add comments
MyonKeminta Apr 1, 2025
355d092
Address partial of the comments
MyonKeminta Apr 1, 2025
369726b
Address comments
MyonKeminta Apr 1, 2025
8553c0a
Move log printing into a separated function
MyonKeminta Apr 1, 2025
51e8561
fix cases in logs
MyonKeminta Apr 1, 2025
63747ef
fix lint
MyonKeminta Apr 1, 2025
885e1fc
fix lint
MyonKeminta Apr 2, 2025
fbcfcb8
Merge branch 'master' into m/gc-state-manager-partial
ti-chi-bot[bot] Apr 8, 2025
9ef2a3b
Merge branch 'm/gc-state-manager-partial' into m/gc-state-manager
MyonKeminta Apr 8, 2025
f3a4b06
fix renaming
MyonKeminta Apr 8, 2025
3e41130
Merge remote-tracking branch 'upstream/master' into m/gc-state-manager
MyonKeminta Apr 8, 2025
dcb1f84
fix etcd key test
MyonKeminta Apr 8, 2025
215680c
Merge branch 'm/gc-state-manager' into m/new-gc-grpc-api
MyonKeminta Apr 9, 2025
cc88bc8
refine logs
MyonKeminta Apr 9, 2025
9aef216
Merge branch 'm/gc-state-manager' into m/new-gc-grpc-api
MyonKeminta Apr 9, 2025
aa80488
fix lint; address comments; renaming
MyonKeminta Apr 10, 2025
e978f22
Update comments
MyonKeminta Apr 10, 2025
d256f66
Implement new gRPC APIs
MyonKeminta Apr 10, 2025
c0c0429
Adapt the http api
MyonKeminta Apr 10, 2025
3e1f42c
Address comments
MyonKeminta Apr 11, 2025
829a9cb
Address comments
MyonKeminta Apr 11, 2025
36eab11
Merge branch 'm/gc-state-manager' into m/new-gc-grpc-api
MyonKeminta Apr 11, 2025
472b597
Add back implementation of V2 API which might still be used by compon…
MyonKeminta Apr 11, 2025
65dc7f8
Fix some tests
MyonKeminta Apr 14, 2025
fea892c
fix invalidated test
MyonKeminta Apr 14, 2025
132e61f
fix lint
MyonKeminta Apr 14, 2025
d214c64
fix lint
MyonKeminta Apr 14, 2025
1adfc07
suppress unnecessary linter warnings
MyonKeminta Apr 14, 2025
165c705
Merge branch 'master' into m/gc-state-manager
ti-chi-bot[bot] Apr 15, 2025
b2e74cf
Merge branch 'master' into m/gc-state-manager
ti-chi-bot[bot] Apr 15, 2025
96a034b
Merge commit 'b2e74cf2ace0dd160295993969c4c571c313d357' into m/new-gc…
MyonKeminta Apr 16, 2025
ef7dded
Merge commit 'caf9fce58d829ee1fbd17929f3606e8d435ee602' into m/new-gc…
MyonKeminta Apr 16, 2025
861dc5e
Update comments
MyonKeminta Apr 16, 2025
554c718
remove old api deprecation related code
MyonKeminta Apr 16, 2025
529e9fc
Add back removed test
MyonKeminta Apr 16, 2025
c32e226
fix initialization order; fix missing field
MyonKeminta Apr 22, 2025
55ff5d6
Merge branch 'master' of https://github.com/tikv/pd into m/new-gc-grp…
MyonKeminta May 7, 2025
c45af95
fix lint
MyonKeminta May 7, 2025
dfa4426
Add tests
MyonKeminta May 8, 2025
90abbd8
Fix test
MyonKeminta May 9, 2025
a8cd548
Fix lint
MyonKeminta May 9, 2025
1b8a3f8
fix etcd key test
MyonKeminta May 12, 2025
b1c8f66
Merge branch 'master' of https://github.com/tikv/pd into m/new-gc-grp…
MyonKeminta May 12, 2025
9f8f1a0
update kvproto to master
MyonKeminta May 12, 2025
4b14687
Address comments
MyonKeminta May 12, 2025
c4b9aea
Address comments
MyonKeminta May 15, 2025
14f1c04
Merge branch 'master' into m/new-gc-grpc-api
ti-chi-bot[bot] May 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20250422131702-6653a56d7ef8
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/MyonKeminta/kvproto v0.0.0-20250422131702-6653a56d7ef8 h1:lWRC7jXKjT9jlOf3isCsMHNMuvfOeQNLDl56bJ/dq+Q=
github.com/MyonKeminta/kvproto v0.0.0-20250422131702-6653a56d7ef8/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -49,8 +51,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20250224053625-b6a98c6bf02d h1:52qhTQG8G8V/pHo/w7F4d2Tw98KMk2C+gAe3U8SWRAg=
github.com/pingcap/kvproto v0.0.0-20250224053625-b6a98c6bf02d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,5 @@ require (
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20250422131702-6653a56d7ef8
1,717 changes: 1,708 additions & 9 deletions go.sum

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion pkg/gc/gc_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,14 @@ func (*GCStateManager) logAdvancingTxnSafePoint(keyspaceID uint32, result Advanc
// GC barrier can extend its lifetime arbitrarily.
//
// Passing non-positive value to ttl is not allowed. Passing `time.Duration(math.MaxInt64)` to ttl indicates that the
// GC barrier should never expire.
// GC barrier should never expire. The ttl might be rounded up, and the actual ttl is guaranteed no less than the
// specified duration.
//
// The barrierID must be non-empty. For NullKeyspace, "gc_worker" is a reserved name and cannot be used as a barrierID.
//
// The given barrierTS must be greater than or equal to the current txn safe point, or an error will be returned.
//
// When this function executes successfully, its result is never nil.
func (m *GCStateManager) SetGCBarrier(keyspaceID uint32, barrierID string, barrierTS uint64, ttl time.Duration, now time.Time) (*endpoint.GCBarrier, error) {
if ttl <= 0 {
return nil, errs.ErrInvalidArgument.GenWithStackByArgs("ttl", ttl)
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/endpoint/gc_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,11 @@ func gcBarrierFromServiceSafePoint(s *ServiceSafePoint) *GCBarrier {
return res
}

// toServiceSafePoint converts the GCBarrier to a synonymous ServiceSafePoint for storing physically.
func (b *GCBarrier) toServiceSafePoint(keyspaceID uint32) *ServiceSafePoint {
// ToServiceSafePoint converts the GCBarrier to a synonymous ServiceSafePoint for storing physically.
// This method should never be used unless handling the physical data, which needs to be compatible with service safe
// points.
// This method is public only for keeping some old HTTP API compatible.
func (b *GCBarrier) ToServiceSafePoint(keyspaceID uint32) *ServiceSafePoint {
res := &ServiceSafePoint{
ServiceID: b.BarrierID,
ExpiredAt: math.MaxInt64,
Expand Down Expand Up @@ -489,7 +492,7 @@ func (wb *GCStateWriteBatch) SetTxnSafePoint(keyspaceID uint32, txnSafePoint uin
// SetGCBarrier sets a GCBarrier with the given barrierID for a specific keyspace.
func (wb *GCStateWriteBatch) SetGCBarrier(keyspaceID uint32, newGCBarrier *GCBarrier) error {
key := keypath.GCBarrierPath(keyspaceID, newGCBarrier.BarrierID)
return wb.writeJSON(key, newGCBarrier.toServiceSafePoint(keyspaceID))
return wb.writeJSON(key, newGCBarrier.ToServiceSafePoint(keyspaceID))
}

// DeleteGCBarrier deletes the GCBarrier with the given barrierID from a specific keyspace.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/endpoint/gc_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestGCBarriersConversions(t *testing.T) {
// Test representing GC barriers by service safe points.
for i, gcBarrier := range gcBarriers {
expectedServiceSafePoint := serviceSafePoints[i]
serviceSafePoint := gcBarrier.toServiceSafePoint(keyspaces[i])
serviceSafePoint := gcBarrier.ToServiceSafePoint(keyspaces[i])
re.Equal(expectedServiceSafePoint, serviceSafePoint)
}

Expand Down
281 changes: 281 additions & 0 deletions server/gc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"encoding/json"
"fmt"
"math"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand All @@ -28,10 +29,13 @@
"github.com/pingcap/log"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/gc"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

// GetGCSafePointV2 return gc safe point for the given keyspace.
Expand Down Expand Up @@ -254,3 +258,280 @@
}
return values, resp.Header.Revision, nil
}

func getKeyspaceID(keyspaceScope *pdpb.KeyspaceScope) uint32 {
if keyspaceScope == nil {
return constant.NullKeyspaceID
}

Check warning on line 265 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L264-L265

Added lines #L264 - L265 were not covered by tests
return keyspaceScope.GetKeyspaceId()
}

func gcBarrierToProto(b *endpoint.GCBarrier, now time.Time) *pdpb.GCBarrierInfo {
if b == nil {
return nil
}

Check warning on line 272 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L271-L272

Added lines #L271 - L272 were not covered by tests

// After rounding, the actual TTL might be not exactly the same as the specified value. Recalculate it anyway.
// MaxInt64 represents that the expiration time is not specified and it never expires.
var resultTTL int64 = math.MaxInt64
if b.ExpirationTime != nil {
resultTTL = int64(math.Floor(b.ExpirationTime.Sub(now).Seconds()))
if resultTTL < 0 {
resultTTL = 0
}

Check warning on line 281 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L280-L281

Added lines #L280 - L281 were not covered by tests
}

return &pdpb.GCBarrierInfo{
BarrierId: b.BarrierID,
BarrierTs: b.BarrierTS,
TtlSeconds: resultTTL,
}
}

func gcStateToProto(gcState gc.GCState, now time.Time) *pdpb.GCState {
gcBarriers := make([]*pdpb.GCBarrierInfo, 0, len(gcState.GCBarriers))
for _, b := range gcState.GCBarriers {
gcBarriers = append(gcBarriers, gcBarrierToProto(b, now))
}
return &pdpb.GCState{
KeyspaceScope: &pdpb.KeyspaceScope{
KeyspaceId: gcState.KeyspaceID,
},
IsKeyspaceLevelGc: gcState.IsKeyspaceLevel,
TxnSafePoint: gcState.TxnSafePoint,
GcSafePoint: gcState.GCSafePoint,
GcBarriers: gcBarriers,
}
}

// AdvanceGCSafePoint tries to advance the GC safe point.
func (s *GrpcServer) AdvanceGCSafePoint(ctx context.Context, request *pdpb.AdvanceGCSafePointRequest) (*pdpb.AdvanceGCSafePointResponse, error) {
done, err := s.rateLimitCheck()
if err != nil {
return nil, err
}

Check warning on line 312 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L311-L312

Added lines #L311 - L312 were not covered by tests
if done != nil {
defer done()
}
fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) {
return pdpb.NewPDClient(client).AdvanceGCSafePoint(ctx, request)
}

Check warning on line 318 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L317-L318

Added lines #L317 - L318 were not covered by tests
if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil {
return nil, err

Check warning on line 320 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L320

Added line #L320 was not covered by tests
} else if rsp != nil {
return rsp.(*pdpb.AdvanceGCSafePointResponse), err
}

Check warning on line 323 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L322-L323

Added lines #L322 - L323 were not covered by tests

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.AdvanceGCSafePointResponse{Header: notBootstrappedHeader()}, nil
}

Check warning on line 328 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L327-L328

Added lines #L327 - L328 were not covered by tests
oldGCSafePoint, newGCSafePoint, err := s.gcStateManager.AdvanceGCSafePoint(getKeyspaceID(request.GetKeyspaceScope()), request.GetTarget())
if err != nil {
return &pdpb.AdvanceGCSafePointResponse{
Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

return &pdpb.AdvanceGCSafePointResponse{
Header: wrapHeader(),
OldGcSafePoint: oldGCSafePoint,
NewGcSafePoint: newGCSafePoint,
}, nil
}

// AdvanceTxnSafePoint tries to advance the transaction safe point.
func (s *GrpcServer) AdvanceTxnSafePoint(ctx context.Context, request *pdpb.AdvanceTxnSafePointRequest) (*pdpb.AdvanceTxnSafePointResponse, error) {
done, err := s.rateLimitCheck()
if err != nil {
return nil, err
}

Check warning on line 348 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L347-L348

Added lines #L347 - L348 were not covered by tests
if done != nil {
defer done()
}
fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) {
return pdpb.NewPDClient(client).AdvanceTxnSafePoint(ctx, request)
}

Check warning on line 354 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L353-L354

Added lines #L353 - L354 were not covered by tests
if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil {
return nil, err

Check warning on line 356 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L356

Added line #L356 was not covered by tests
} else if rsp != nil {
return rsp.(*pdpb.AdvanceTxnSafePointResponse), err
}

Check warning on line 359 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L358-L359

Added lines #L358 - L359 were not covered by tests

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.AdvanceTxnSafePointResponse{Header: notBootstrappedHeader()}, nil
}

Check warning on line 364 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L363-L364

Added lines #L363 - L364 were not covered by tests

res, err := s.gcStateManager.AdvanceTxnSafePoint(getKeyspaceID(request.GetKeyspaceScope()), request.GetTarget(), time.Now())
if err != nil {
return &pdpb.AdvanceTxnSafePointResponse{
Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

return &pdpb.AdvanceTxnSafePointResponse{
Header: wrapHeader(),
OldTxnSafePoint: res.OldTxnSafePoint,
NewTxnSafePoint: res.NewTxnSafePoint,
BlockerDescription: res.BlockerDescription,
}, nil
}

// SetGCBarrier sets a GC barrier.
func (s *GrpcServer) SetGCBarrier(ctx context.Context, request *pdpb.SetGCBarrierRequest) (*pdpb.SetGCBarrierResponse, error) {
done, err := s.rateLimitCheck()
if err != nil {
return nil, err
}

Check warning on line 386 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L385-L386

Added lines #L385 - L386 were not covered by tests
if done != nil {
defer done()
}
fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) {
return pdpb.NewPDClient(client).SetGCBarrier(ctx, request)
}

Check warning on line 392 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L391-L392

Added lines #L391 - L392 were not covered by tests
if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil {
return nil, err

Check warning on line 394 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L394

Added line #L394 was not covered by tests
} else if rsp != nil {
return rsp.(*pdpb.SetGCBarrierResponse), err
}

Check warning on line 397 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L396-L397

Added lines #L396 - L397 were not covered by tests

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SetGCBarrierResponse{Header: notBootstrappedHeader()}, nil
}

Check warning on line 402 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L401-L402

Added lines #L401 - L402 were not covered by tests

now := time.Now()
newBarrier, err := s.gcStateManager.SetGCBarrier(
getKeyspaceID(request.GetKeyspaceScope()),
request.GetBarrierId(),
request.GetBarrierTs(),
typeutil.SaturatingStdDurationFromSeconds(request.GetTtlSeconds()),
now)
if err != nil {
return &pdpb.SetGCBarrierResponse{
Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

return &pdpb.SetGCBarrierResponse{
Header: wrapHeader(),
NewBarrierInfo: gcBarrierToProto(newBarrier, now),
}, nil
}

// DeleteGCBarrier deletes a GC barrier.
func (s *GrpcServer) DeleteGCBarrier(ctx context.Context, request *pdpb.DeleteGCBarrierRequest) (*pdpb.DeleteGCBarrierResponse, error) {
done, err := s.rateLimitCheck()
if err != nil {
return nil, err
}

Check warning on line 428 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L427-L428

Added lines #L427 - L428 were not covered by tests
if done != nil {
defer done()
}
fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) {
return pdpb.NewPDClient(client).DeleteGCBarrier(ctx, request)
}

Check warning on line 434 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L433-L434

Added lines #L433 - L434 were not covered by tests
if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil {
return nil, err

Check warning on line 436 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L436

Added line #L436 was not covered by tests
} else if rsp != nil {
return rsp.(*pdpb.DeleteGCBarrierResponse), err
}

Check warning on line 439 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L438-L439

Added lines #L438 - L439 were not covered by tests

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.DeleteGCBarrierResponse{Header: notBootstrappedHeader()}, nil
}

Check warning on line 444 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L443-L444

Added lines #L443 - L444 were not covered by tests

now := time.Now()

deletedBarrier, err := s.gcStateManager.DeleteGCBarrier(getKeyspaceID(request.GetKeyspaceScope()), request.GetBarrierId())
if err != nil {
return &pdpb.DeleteGCBarrierResponse{
Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

Check warning on line 453 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L450-L453

Added lines #L450 - L453 were not covered by tests

return &pdpb.DeleteGCBarrierResponse{
Header: wrapHeader(),
DeletedBarrierInfo: gcBarrierToProto(deletedBarrier, now),
}, nil
}

// GetGCState gets the GC state.
func (s *GrpcServer) GetGCState(ctx context.Context, request *pdpb.GetGCStateRequest) (*pdpb.GetGCStateResponse, error) {
done, err := s.rateLimitCheck()
if err != nil {
return nil, err
}

Check warning on line 466 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L465-L466

Added lines #L465 - L466 were not covered by tests
if done != nil {
defer done()
}
fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) {
return pdpb.NewPDClient(client).GetGCState(ctx, request)
}

Check warning on line 472 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L471-L472

Added lines #L471 - L472 were not covered by tests
if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil {
return nil, err

Check warning on line 474 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L474

Added line #L474 was not covered by tests
} else if rsp != nil {
return rsp.(*pdpb.GetGCStateResponse), err
}

Check warning on line 477 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L476-L477

Added lines #L476 - L477 were not covered by tests

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.GetGCStateResponse{Header: notBootstrappedHeader()}, nil
}

Check warning on line 482 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L481-L482

Added lines #L481 - L482 were not covered by tests

gcState, err := s.gcStateManager.GetGCState(getKeyspaceID(request.GetKeyspaceScope()))
if err != nil {
return &pdpb.GetGCStateResponse{
Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

Check warning on line 489 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L486-L489

Added lines #L486 - L489 were not covered by tests

return &pdpb.GetGCStateResponse{
Header: wrapHeader(),
GcState: gcStateToProto(gcState, time.Now()),
}, nil
}

// GetAllKeyspacesGCStates gets the GC states of all keyspaces.
func (s *GrpcServer) GetAllKeyspacesGCStates(ctx context.Context, request *pdpb.GetAllKeyspacesGCStatesRequest) (*pdpb.GetAllKeyspacesGCStatesResponse, error) {
done, err := s.rateLimitCheck()
if err != nil {
return nil, err
}

Check warning on line 502 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L501-L502

Added lines #L501 - L502 were not covered by tests
if done != nil {
defer done()
}
fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) {
return pdpb.NewPDClient(client).GetAllKeyspacesGCStates(ctx, request)
}

Check warning on line 508 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L507-L508

Added lines #L507 - L508 were not covered by tests
if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil {
return nil, err

Check warning on line 510 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L510

Added line #L510 was not covered by tests
} else if rsp != nil {
return rsp.(*pdpb.GetAllKeyspacesGCStatesResponse), err
}

Check warning on line 513 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L512-L513

Added lines #L512 - L513 were not covered by tests

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.GetAllKeyspacesGCStatesResponse{Header: notBootstrappedHeader()}, nil
}

Check warning on line 518 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L517-L518

Added lines #L517 - L518 were not covered by tests

gcStates, err := s.gcStateManager.GetAllKeyspacesGCStates()
if err != nil {
return &pdpb.GetAllKeyspacesGCStatesResponse{
Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

Check warning on line 525 in server/gc_service.go

View check run for this annotation

Codecov / codecov/patch

server/gc_service.go#L522-L525

Added lines #L522 - L525 were not covered by tests

now := time.Now()
gcStatesPb := make([]*pdpb.GCState, 0, len(gcStates))
for _, gcState := range gcStates {
gcStatesPb = append(gcStatesPb, gcStateToProto(gcState, now))
}

return &pdpb.GetAllKeyspacesGCStatesResponse{
Header: wrapHeader(),
GcStates: gcStatesPb,
}, nil
}
Loading