Skip to content

Commit 6897830

Browse files
committed
keyspace: add preconditions for keyspace config update
Changes: - Extend /pd/api/v2/keyspaces/{name}/config PATCH with "preconditions" to support CAS-like config updates (equal / absent). - Add pd-ctl support: keyspace update-config --expect and --expect-absent. Rationale: We need PD to provide a simple CAS-style guard for keyspace config updates, so concurrent key-rotation workflows can coordinate safely and only the expected state transition is allowed.
1 parent 8bb1197 commit 6897830

File tree

8 files changed

+381
-12
lines changed

8 files changed

+381
-12
lines changed

pkg/errs/errno.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ var (
247247
ErrIllegalOperation = errors.Normalize("unknown operation", errors.RFCCodeText("PD:keyspace:ErrIllegalOperation"))
248248
// ErrUnsupportedOperationInKeyspace is used to indicate this is an unsupported operation.
249249
ErrUnsupportedOperationInKeyspace = errors.Normalize("it's a unsupported operation", errors.RFCCodeText("PD:keyspace:ErrUnsupportedOperationInKeyspace"))
250+
// ErrKeyspaceConfigPreconditionFailed is used to indicate the precondition for updating keyspace config is not met.
251+
ErrKeyspaceConfigPreconditionFailed = errors.Normalize("keyspace config precondition failed, %s", errors.RFCCodeText("PD:keyspace:ErrKeyspaceConfigPreconditionFailed"))
250252
// ErrKeyspaceGroupPrimaryNotFound is used to indicate primary of target keyspace group does not exist.
251253
ErrKeyspaceGroupPrimaryNotFound = errors.Normalize("primary of keyspace group does not exist", errors.RFCCodeText("PD:keyspace:ErrKeyspaceGroupPrimaryNotFound"))
252254
// ErrKeyspaceGroupNotExists is used to indicate target keyspace group does not exist.

pkg/keyspace/keyspace.go

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,68 @@ const (
645645
// UpdateKeyspaceConfig changes target keyspace's config in the order specified in mutations.
646646
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
647647
func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation) (*keyspacepb.KeyspaceMeta, error) {
648+
return manager.updateKeyspaceConfig(name, func(meta *keyspacepb.KeyspaceMeta) error {
649+
for _, mutation := range mutations {
650+
switch mutation.Op {
651+
case OpPut:
652+
meta.Config[mutation.Key] = mutation.Value
653+
case OpDel:
654+
delete(meta.Config, mutation.Key)
655+
default:
656+
return errs.ErrIllegalOperation
657+
}
658+
}
659+
return nil
660+
})
661+
}
662+
663+
// UpdateKeyspaceConfigWithPreconditions changes target keyspace's config in the order specified in mutations if the
664+
// given preconditions are satisfied.
665+
// Preconditions use a JSON-merge-patch-like encoding:
666+
// - key -> null means the key must be absent.
667+
// - key -> "value" means the key must exist and equal "value".
668+
func (manager *Manager) UpdateKeyspaceConfigWithPreconditions(name string, mutations []*Mutation, preconditions map[string]*string) (*keyspacepb.KeyspaceMeta, error) {
669+
if len(preconditions) == 0 {
670+
return manager.UpdateKeyspaceConfig(name, mutations)
671+
}
672+
return manager.updateKeyspaceConfig(name, func(meta *keyspacepb.KeyspaceMeta) error {
673+
if err := checkKeyspaceConfigPreconditions(meta.GetConfig(), preconditions); err != nil {
674+
return err
675+
}
676+
for _, mutation := range mutations {
677+
switch mutation.Op {
678+
case OpPut:
679+
meta.Config[mutation.Key] = mutation.Value
680+
case OpDel:
681+
delete(meta.Config, mutation.Key)
682+
default:
683+
return errs.ErrIllegalOperation
684+
}
685+
}
686+
return nil
687+
})
688+
}
689+
690+
func checkKeyspaceConfigPreconditions(config map[string]string, preconditions map[string]*string) error {
691+
for k, expected := range preconditions {
692+
actual, exists := config[k]
693+
if expected == nil {
694+
if exists {
695+
return errs.ErrKeyspaceConfigPreconditionFailed.FastGenByArgs(k + " must be absent")
696+
}
697+
continue
698+
}
699+
if !exists {
700+
return errs.ErrKeyspaceConfigPreconditionFailed.FastGenByArgs(k + " does not exist")
701+
}
702+
if actual != *expected {
703+
return errs.ErrKeyspaceConfigPreconditionFailed.FastGenByArgs("key=" + k + " expected=" + *expected + " actual=" + actual)
704+
}
705+
}
706+
return nil
707+
}
708+
709+
func (manager *Manager) updateKeyspaceConfig(name string, update func(meta *keyspacepb.KeyspaceMeta) error) (*keyspacepb.KeyspaceMeta, error) {
648710
var meta *keyspacepb.KeyspaceMeta
649711
oldConfig := make(map[string]string)
650712
err := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
@@ -677,16 +739,9 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation)
677739
for k, v := range meta.GetConfig() {
678740
oldConfig[k] = v
679741
}
680-
// Update keyspace config according to mutations.
681-
for _, mutation := range mutations {
682-
switch mutation.Op {
683-
case OpPut:
684-
meta.Config[mutation.Key] = mutation.Value
685-
case OpDel:
686-
delete(meta.Config, mutation.Key)
687-
default:
688-
return errs.ErrIllegalOperation
689-
}
742+
// Update keyspace config.
743+
if err := update(meta); err != nil {
744+
return err
690745
}
691746
newConfig := meta.GetConfig()
692747
oldUserKind := endpoint.StringUserKind(oldConfig[UserKindKey])

pkg/keyspace/keyspace_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package keyspace
1616

1717
import (
1818
"context"
19+
goerrors "errors"
1920
"fmt"
2021
"math"
2122
"strconv"
@@ -390,6 +391,68 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfig() {
390391
checkMutations(re, nil, updated.Config, mutations)
391392
}
392393

394+
func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfigWithPreconditions() {
395+
re := suite.Require()
396+
manager := suite.manager
397+
398+
ksName := "precond_test"
399+
_, err := manager.CreateKeyspace(&CreateKeyspaceRequest{
400+
Name: ksName,
401+
Config: map[string]string{},
402+
CreateTime: time.Now().Unix(),
403+
})
404+
re.NoError(err)
405+
406+
currentKey := "current_file_id"
407+
nextKey := "next_file_id"
408+
409+
// 1) only set next_file_id if it doesn't exist already.
410+
nextV1 := "1000"
411+
meta, err := manager.UpdateKeyspaceConfigWithPreconditions(ksName, []*Mutation{
412+
{Op: OpPut, Key: nextKey, Value: nextV1},
413+
}, map[string]*string{
414+
nextKey: nil,
415+
})
416+
re.NoError(err)
417+
re.Equal(nextV1, meta.Config[nextKey])
418+
419+
nextV2 := "2000"
420+
_, err = manager.UpdateKeyspaceConfigWithPreconditions(ksName, []*Mutation{
421+
{Op: OpPut, Key: nextKey, Value: nextV2},
422+
}, map[string]*string{
423+
nextKey: nil,
424+
})
425+
re.Error(err)
426+
re.True(goerrors.Is(err, errs.ErrKeyspaceConfigPreconditionFailed))
427+
428+
// 2) Update current_file_id to the next_file_id (guarded by next_file_id == expected).
429+
meta, err = manager.UpdateKeyspaceConfigWithPreconditions(ksName, []*Mutation{
430+
{Op: OpPut, Key: currentKey, Value: nextV1},
431+
}, map[string]*string{
432+
nextKey: &nextV1,
433+
})
434+
re.NoError(err)
435+
re.Equal(nextV1, meta.Config[currentKey])
436+
437+
// 3) Delete next_file_id if it matches my expected value.
438+
wrongExpected := "999"
439+
_, err = manager.UpdateKeyspaceConfigWithPreconditions(ksName, []*Mutation{
440+
{Op: OpDel, Key: nextKey},
441+
}, map[string]*string{
442+
nextKey: &wrongExpected,
443+
})
444+
re.Error(err)
445+
re.True(goerrors.Is(err, errs.ErrKeyspaceConfigPreconditionFailed))
446+
447+
meta, err = manager.UpdateKeyspaceConfigWithPreconditions(ksName, []*Mutation{
448+
{Op: OpDel, Key: nextKey},
449+
}, map[string]*string{
450+
nextKey: &nextV1,
451+
})
452+
re.NoError(err)
453+
re.NotContains(meta.GetConfig(), nextKey)
454+
}
455+
393456
func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() {
394457
re := suite.Require()
395458
manager := suite.manager

server/apiv2/handlers/keyspace.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package handlers
1616

1717
import (
1818
"encoding/json"
19+
goerrors "errors"
1920
"net/http"
2021
"strconv"
2122
"strings"
@@ -308,6 +309,10 @@ func LoadAllKeyspaces(c *gin.Context) {
308309
// which will both be set to "" if value type is string during binding.
309310
type UpdateConfigParams struct {
310311
Config map[string]*string `json:"config"`
312+
// Preconditions specifies prerequisites for updating config, using a JSON-merge-patch-like encoding:
313+
// - key -> null means the key must be absent.
314+
// - key -> "value" means the key must exist and equal "value".
315+
Preconditions map[string]*string `json:"preconditions,omitempty"`
311316
}
312317

313318
// UpdateKeyspaceConfig updates target keyspace's config.
@@ -350,8 +355,16 @@ func UpdateKeyspaceConfig(c *gin.Context) {
350355
}
351356
}
352357

353-
meta, err := manager.UpdateKeyspaceConfig(name, mutations)
358+
meta, err := manager.UpdateKeyspaceConfigWithPreconditions(name, mutations, configParams.Preconditions)
354359
if err != nil {
360+
if goerrors.Is(err, errs.ErrKeyspaceConfigPreconditionFailed) {
361+
c.AbortWithStatusJSON(http.StatusConflict, err.Error())
362+
return
363+
}
364+
if goerrors.Is(err, errs.ErrEtcdTxnConflict) {
365+
c.AbortWithStatusJSON(http.StatusConflict, err.Error())
366+
return
367+
}
355368
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
356369
return
357370
}

tests/server/apiv2/handlers/keyspace_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,132 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfig() {
113113
}
114114
}
115115

116+
func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfigPreconditions() {
117+
re := suite.Require()
118+
created := MustCreateKeyspace(re, suite.server, &handlers.CreateKeyspaceParams{
119+
Name: "test_keyspace_cas",
120+
Config: map[string]string{},
121+
})
122+
re.NotNil(created)
123+
124+
currentKey := "current_file_id"
125+
nextKey := "next_file_id"
126+
127+
next := "1000"
128+
status, body, meta := tryUpdateKeyspaceConfig(re, suite.server, created.Name, &handlers.UpdateConfigParams{
129+
Config: map[string]*string{
130+
nextKey: &next,
131+
},
132+
Preconditions: map[string]*string{
133+
nextKey: nil,
134+
},
135+
})
136+
re.Equal(http.StatusOK, status, body)
137+
re.Equal(next, meta.Config[nextKey])
138+
139+
otherNext := "2000"
140+
status, body, meta = tryUpdateKeyspaceConfig(re, suite.server, created.Name, &handlers.UpdateConfigParams{
141+
Config: map[string]*string{
142+
nextKey: &otherNext,
143+
},
144+
Preconditions: map[string]*string{
145+
nextKey: nil,
146+
},
147+
})
148+
re.Equal(http.StatusConflict, status)
149+
re.Nil(meta)
150+
re.Contains(body, "precondition failed")
151+
152+
status, _, meta = tryUpdateKeyspaceConfig(re, suite.server, created.Name, &handlers.UpdateConfigParams{
153+
Config: map[string]*string{
154+
currentKey: &next,
155+
},
156+
Preconditions: map[string]*string{
157+
nextKey: &next,
158+
},
159+
})
160+
re.Equal(http.StatusOK, status)
161+
re.Equal(next, meta.Config[currentKey])
162+
re.Equal(next, meta.Config[nextKey])
163+
164+
wrongExpected := "999"
165+
status, body, meta = tryUpdateKeyspaceConfig(re, suite.server, created.Name, &handlers.UpdateConfigParams{
166+
Config: map[string]*string{
167+
nextKey: nil,
168+
},
169+
Preconditions: map[string]*string{
170+
nextKey: &wrongExpected,
171+
},
172+
})
173+
re.Equal(http.StatusConflict, status)
174+
re.Nil(meta)
175+
re.Contains(body, "expected")
176+
177+
status, _, meta = tryUpdateKeyspaceConfig(re, suite.server, created.Name, &handlers.UpdateConfigParams{
178+
Config: map[string]*string{
179+
nextKey: nil,
180+
},
181+
Preconditions: map[string]*string{
182+
nextKey: &next,
183+
},
184+
})
185+
re.Equal(http.StatusOK, status)
186+
re.Equal(next, meta.Config[currentKey])
187+
re.NotContains(meta.GetConfig(), nextKey)
188+
}
189+
190+
func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfigPreconditionsConcurrentSetNextIfAbsent() {
191+
re := suite.Require()
192+
created := MustCreateKeyspace(re, suite.server, &handlers.CreateKeyspaceParams{
193+
Name: "test_keyspace_cas2",
194+
Config: map[string]string{},
195+
})
196+
re.NotNil(created)
197+
198+
nextKey := "next_file_id"
199+
start := make(chan struct{})
200+
results := make(chan int, 2)
201+
202+
go func() {
203+
<-start
204+
next := "1000"
205+
status, body, _ := tryUpdateKeyspaceConfig(re, suite.server, created.Name, &handlers.UpdateConfigParams{
206+
Config: map[string]*string{
207+
nextKey: &next,
208+
},
209+
Preconditions: map[string]*string{
210+
nextKey: nil,
211+
},
212+
})
213+
if status != http.StatusOK && status != http.StatusConflict {
214+
re.FailNow("unexpected status", "status=%d body=%s", status, body)
215+
}
216+
results <- status
217+
}()
218+
go func() {
219+
<-start
220+
next := "2000"
221+
status, body, _ := tryUpdateKeyspaceConfig(re, suite.server, created.Name, &handlers.UpdateConfigParams{
222+
Config: map[string]*string{
223+
nextKey: &next,
224+
},
225+
Preconditions: map[string]*string{
226+
nextKey: nil,
227+
},
228+
})
229+
if status != http.StatusOK && status != http.StatusConflict {
230+
re.FailNow("unexpected status", "status=%d body=%s", status, body)
231+
}
232+
results <- status
233+
}()
234+
235+
close(start)
236+
237+
s1 := <-results
238+
s2 := <-results
239+
re.ElementsMatch([]int{http.StatusOK, http.StatusConflict}, []int{s1, s2})
240+
}
241+
116242
func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() {
117243
re := suite.Require()
118244
keyspaces := mustMakeTestKeyspaces(re, suite.server, 10)

tests/server/apiv2/handlers/testutil.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,24 @@ func mustUpdateKeyspaceConfig(re *require.Assertions, server *tests.TestServer,
152152
return meta.KeyspaceMeta
153153
}
154154

155+
func tryUpdateKeyspaceConfig(re *require.Assertions, server *tests.TestServer, name string, request *handlers.UpdateConfigParams) (int, string, *keyspacepb.KeyspaceMeta) {
156+
data, err := json.Marshal(request)
157+
re.NoError(err)
158+
httpReq, err := http.NewRequest(http.MethodPatch, server.GetAddr()+keyspacesPrefix+"/"+name+"/config", bytes.NewBuffer(data))
159+
re.NoError(err)
160+
resp, err := tests.TestDialClient.Do(httpReq)
161+
re.NoError(err)
162+
defer resp.Body.Close()
163+
data, err = io.ReadAll(resp.Body)
164+
re.NoError(err)
165+
if resp.StatusCode != http.StatusOK {
166+
return resp.StatusCode, string(data), nil
167+
}
168+
meta := &handlers.KeyspaceMeta{}
169+
re.NoError(json.Unmarshal(data, meta))
170+
return resp.StatusCode, string(data), meta.KeyspaceMeta
171+
}
172+
155173
func mustLoadKeyspaces(re *require.Assertions, server *tests.TestServer, name string) *keyspacepb.KeyspaceMeta {
156174
resp, err := tests.TestDialClient.Get(server.GetAddr() + keyspacesPrefix + "/" + name)
157175
re.NoError(err)

0 commit comments

Comments
 (0)