Skip to content

Commit 26ee2e2

Browse files
authored
Merge pull request #1818 from pracucci/fix-tsdb-transfer
Fix TSDB creation conflicting with transfer
2 parents 48122ac + 772ed25 commit 26ee2e2

File tree

6 files changed

+270
-71
lines changed

6 files changed

+270
-71
lines changed

pkg/ingester/ingester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
191191
}
192192

193193
if cfg.TSDBEnabled {
194-
return NewV2(cfg, clientConfig, limits, chunkStore, registerer)
194+
return NewV2(cfg, clientConfig, limits, registerer)
195195
}
196196

197197
i := &Ingester{

pkg/ingester/ingester_v2.go

Lines changed: 75 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package ingester
33
import (
44
"fmt"
55
"net/http"
6-
"path/filepath"
76
"time"
87

98
"github.com/cortexproject/cortex/pkg/ingester/client"
@@ -26,14 +25,18 @@ import (
2625
old_ctx "golang.org/x/net/context"
2726
)
2827

28+
const (
29+
errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)"
30+
)
31+
2932
// TSDBState holds data structures used by the TSDB storage engine
3033
type TSDBState struct {
3134
dbs map[string]*tsdb.DB // tsdb sharded by userID
3235
bucket objstore.Bucket
3336
}
3437

3538
// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
36-
func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) {
39+
func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
3740
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.TSDBConfig, "cortex", util.Logger)
3841
if err != nil {
3942
return nil, err
@@ -44,7 +47,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
4447
clientConfig: clientConfig,
4548
metrics: newIngesterMetrics(registerer),
4649
limits: limits,
47-
chunkStore: chunkStore,
50+
chunkStore: nil,
4851
quit: make(chan struct{}),
4952

5053
TSDBState: TSDBState{
@@ -77,7 +80,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
7780
return nil, fmt.Errorf("no user id")
7881
}
7982

80-
db, err := i.getOrCreateTSDB(userID)
83+
db, err := i.getOrCreateTSDB(userID, false)
8184
if err != nil {
8285
return nil, err
8386
}
@@ -151,9 +154,9 @@ func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*clie
151154

152155
i.metrics.queries.Inc()
153156

154-
db, err := i.getOrCreateTSDB(userID)
155-
if err != nil {
156-
return nil, fmt.Errorf("failed to find/create user db: %v", err)
157+
db := i.getTSDB(userID)
158+
if db == nil {
159+
return &client.QueryResponse{}, nil
157160
}
158161

159162
q, err := db.Querier(int64(from), int64(through))
@@ -220,9 +223,9 @@ func (i *Ingester) v2LabelValues(ctx old_ctx.Context, req *client.LabelValuesReq
220223
return nil, err
221224
}
222225

223-
db, err := i.getOrCreateTSDB(userID)
224-
if err != nil {
225-
return nil, fmt.Errorf("failed to find/create user db: %v", err)
226+
db := i.getTSDB(userID)
227+
if db == nil {
228+
return &client.LabelValuesResponse{}, nil
226229
}
227230

228231
through := time.Now()
@@ -249,9 +252,9 @@ func (i *Ingester) v2LabelNames(ctx old_ctx.Context, req *client.LabelNamesReque
249252
return nil, err
250253
}
251254

252-
db, err := i.getOrCreateTSDB(userID)
253-
if err != nil {
254-
return nil, fmt.Errorf("failed to find/create user db: %v", err)
255+
db := i.getTSDB(userID)
256+
if db == nil {
257+
return &client.LabelNamesResponse{}, nil
255258
}
256259

257260
through := time.Now()
@@ -279,58 +282,69 @@ func (i *Ingester) getTSDB(userID string) *tsdb.DB {
279282
return db
280283
}
281284

282-
func (i *Ingester) getOrCreateTSDB(userID string) (*tsdb.DB, error) {
285+
func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) {
283286
db := i.getTSDB(userID)
284-
if db == nil {
285-
i.userStatesMtx.Lock()
286-
defer i.userStatesMtx.Unlock()
287-
288-
// Check again for DB in the event it was created in-between locks
289-
var ok bool
290-
db, ok = i.TSDBState.dbs[userID]
291-
if !ok {
292-
293-
udir := i.userDir(userID)
294-
295-
// Create a new user database
296-
var err error
297-
db, err = tsdb.Open(udir, util.Logger, nil, &tsdb.Options{
298-
RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond),
299-
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(),
300-
NoLockfile: true,
301-
})
302-
if err != nil {
303-
return nil, err
304-
}
287+
if db != nil {
288+
return db, nil
289+
}
305290

306-
// Thanos shipper requires at least 1 external label to be set. For this reason,
307-
// we set the tenant ID as external label and we'll filter it out when reading
308-
// the series from the storage.
309-
l := lbls.Labels{
310-
{
311-
Name: cortex_tsdb.TenantIDExternalLabel,
312-
Value: userID,
313-
},
314-
}
291+
i.userStatesMtx.Lock()
292+
defer i.userStatesMtx.Unlock()
315293

316-
// Create a new shipper for this database
317-
s := shipper.New(util.Logger, nil, udir, &Bucket{userID, i.TSDBState.bucket}, func() lbls.Labels { return l }, metadata.ReceiveSource)
318-
i.done.Add(1)
319-
go func() {
320-
defer i.done.Done()
321-
runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error {
322-
if uploaded, err := s.Sync(context.Background()); err != nil {
323-
level.Warn(util.Logger).Log("err", err, "uploaded", uploaded)
324-
}
325-
return nil
326-
})
327-
}()
328-
329-
i.TSDBState.dbs[userID] = db
330-
}
294+
// Check again for DB in the event it was created in-between locks
295+
var ok bool
296+
db, ok = i.TSDBState.dbs[userID]
297+
if ok {
298+
return db, nil
299+
}
300+
301+
// We're ready to create the TSDB, however we must be sure that the ingester
302+
// is in the ACTIVE state, otherwise it may conflict with the transfer in/out.
303+
// The TSDB is created when the first series is pushed and this shouldn't happen
304+
// to a non-ACTIVE ingester, however we want to protect from any bug, cause we
305+
// may have data loss or TSDB WAL corruption if the TSDB is created before/during
306+
// a transfer in occurs.
307+
if ingesterState := i.lifecycler.GetState(); !force && ingesterState != ring.ACTIVE {
308+
return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState)
309+
}
310+
311+
udir := i.cfg.TSDBConfig.BlocksDir(userID)
312+
313+
// Create a new user database
314+
var err error
315+
db, err = tsdb.Open(udir, util.Logger, nil, &tsdb.Options{
316+
RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond),
317+
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(),
318+
NoLockfile: true,
319+
})
320+
if err != nil {
321+
return nil, err
322+
}
323+
324+
// Thanos shipper requires at least 1 external label to be set. For this reason,
325+
// we set the tenant ID as external label and we'll filter it out when reading
326+
// the series from the storage.
327+
l := lbls.Labels{
328+
{
329+
Name: cortex_tsdb.TenantIDExternalLabel,
330+
Value: userID,
331+
},
331332
}
332333

334+
// Create a new shipper for this database
335+
s := shipper.New(util.Logger, nil, udir, &Bucket{userID, i.TSDBState.bucket}, func() lbls.Labels { return l }, metadata.ReceiveSource)
336+
i.done.Add(1)
337+
go func() {
338+
defer i.done.Done()
339+
runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error {
340+
if uploaded, err := s.Sync(context.Background()); err != nil {
341+
level.Warn(util.Logger).Log("err", err, "uploaded", uploaded)
342+
}
343+
return nil
344+
})
345+
}()
346+
347+
i.TSDBState.dbs[userID] = db
348+
333349
return db, nil
334350
}
335-
336-
func (i *Ingester) userDir(userID string) string { return filepath.Join(i.cfg.TSDBConfig.Dir, userID) }

pkg/ingester/ingester_v2_test.go

Lines changed: 152 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ingester
22

33
import (
4+
"fmt"
45
"io/ioutil"
56
"math"
67
"net/http"
@@ -138,13 +139,21 @@ func TestIngester_v2Push(t *testing.T) {
138139
registry := prometheus.NewRegistry()
139140

140141
// Create a mocked ingester
141-
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), registry)
142+
cfg := defaultIngesterTestConfig()
143+
cfg.LifecyclerConfig.JoinAfter = 0
144+
145+
i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, registry)
142146
require.NoError(t, err)
143147
defer i.Shutdown()
144148
defer cleanup()
145149

146150
ctx := user.InjectOrgID(context.Background(), "test")
147151

152+
// Wait until the ingester is ACTIVE
153+
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
154+
return i.lifecycler.GetState()
155+
})
156+
148157
// Push timeseries
149158
for idx, req := range testData.reqs {
150159
_, err := i.v2Push(ctx, req)
@@ -388,6 +397,147 @@ func Test_Ingester_v2Query(t *testing.T) {
388397
})
389398
}
390399
}
400+
func TestIngester_v2Query_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
401+
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
402+
require.NoError(t, err)
403+
defer i.Shutdown()
404+
defer cleanup()
405+
406+
// Mock request
407+
userID := "test"
408+
ctx := user.InjectOrgID(context.Background(), userID)
409+
req := &client.QueryRequest{}
410+
411+
res, err := i.v2Query(ctx, req)
412+
require.NoError(t, err)
413+
assert.Equal(t, &client.QueryResponse{}, res)
414+
415+
// Check if the TSDB has been created
416+
_, tsdbCreated := i.TSDBState.dbs[userID]
417+
assert.False(t, tsdbCreated)
418+
}
419+
420+
func TestIngester_v2LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
421+
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
422+
require.NoError(t, err)
423+
defer i.Shutdown()
424+
defer cleanup()
425+
426+
// Mock request
427+
userID := "test"
428+
ctx := user.InjectOrgID(context.Background(), userID)
429+
req := &client.LabelValuesRequest{}
430+
431+
res, err := i.v2LabelValues(ctx, req)
432+
require.NoError(t, err)
433+
assert.Equal(t, &client.LabelValuesResponse{}, res)
434+
435+
// Check if the TSDB has been created
436+
_, tsdbCreated := i.TSDBState.dbs[userID]
437+
assert.False(t, tsdbCreated)
438+
}
439+
440+
func TestIngester_v2LabelNames_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
441+
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
442+
require.NoError(t, err)
443+
defer i.Shutdown()
444+
defer cleanup()
445+
446+
// Mock request
447+
userID := "test"
448+
ctx := user.InjectOrgID(context.Background(), userID)
449+
req := &client.LabelNamesRequest{}
450+
451+
res, err := i.v2LabelNames(ctx, req)
452+
require.NoError(t, err)
453+
assert.Equal(t, &client.LabelNamesResponse{}, res)
454+
455+
// Check if the TSDB has been created
456+
_, tsdbCreated := i.TSDBState.dbs[userID]
457+
assert.False(t, tsdbCreated)
458+
}
459+
460+
func TestIngester_v2Push_ShouldNotCreateTSDBIfNotInActiveState(t *testing.T) {
461+
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
462+
require.NoError(t, err)
463+
defer i.Shutdown()
464+
defer cleanup()
465+
require.Equal(t, ring.PENDING, i.lifecycler.GetState())
466+
467+
// Mock request
468+
userID := "test"
469+
ctx := user.InjectOrgID(context.Background(), userID)
470+
req := &client.WriteRequest{}
471+
472+
res, err := i.v2Push(ctx, req)
473+
assert.Equal(t, fmt.Errorf(errTSDBCreateIncompatibleState, "PENDING"), err)
474+
assert.Nil(t, res)
475+
476+
// Check if the TSDB has been created
477+
_, tsdbCreated := i.TSDBState.dbs[userID]
478+
assert.False(t, tsdbCreated)
479+
}
480+
481+
func TestIngester_getOrCreateTSDB_ShouldNotAllowToCreateTSDBIfIngesterStateIsNotActive(t *testing.T) {
482+
tests := map[string]struct {
483+
state ring.IngesterState
484+
expectedErr error
485+
}{
486+
"not allow to create TSDB if in PENDING state": {
487+
state: ring.PENDING,
488+
expectedErr: fmt.Errorf(errTSDBCreateIncompatibleState, ring.PENDING),
489+
},
490+
"not allow to create TSDB if in JOINING state": {
491+
state: ring.JOINING,
492+
expectedErr: fmt.Errorf(errTSDBCreateIncompatibleState, ring.JOINING),
493+
},
494+
"not allow to create TSDB if in LEAVING state": {
495+
state: ring.LEAVING,
496+
expectedErr: fmt.Errorf(errTSDBCreateIncompatibleState, ring.LEAVING),
497+
},
498+
"allow to create TSDB if in ACTIVE state": {
499+
state: ring.ACTIVE,
500+
expectedErr: nil,
501+
},
502+
}
503+
504+
for testName, testData := range tests {
505+
t.Run(testName, func(t *testing.T) {
506+
cfg := defaultIngesterTestConfig()
507+
cfg.LifecyclerConfig.JoinAfter = 60 * time.Second
508+
509+
i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil)
510+
require.NoError(t, err)
511+
defer i.Shutdown()
512+
defer cleanup()
513+
514+
// Switch ingester state to the expected one in the test
515+
if i.lifecycler.GetState() != testData.state {
516+
var stateChain []ring.IngesterState
517+
518+
if testData.state == ring.LEAVING {
519+
stateChain = []ring.IngesterState{ring.ACTIVE, ring.LEAVING}
520+
} else {
521+
stateChain = []ring.IngesterState{testData.state}
522+
}
523+
524+
for _, s := range stateChain {
525+
err = i.lifecycler.ChangeState(context.Background(), s)
526+
require.NoError(t, err)
527+
}
528+
}
529+
530+
db, err := i.getOrCreateTSDB("test", false)
531+
assert.Equal(t, testData.expectedErr, err)
532+
533+
if testData.expectedErr != nil {
534+
assert.Nil(t, db)
535+
} else {
536+
assert.NotNil(t, db)
537+
}
538+
})
539+
}
540+
}
391541

392542
func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse) {
393543
samples := []client.Sample{
@@ -432,7 +582,7 @@ func newIngesterMockWithTSDBStorage(ingesterCfg Config, registerer prometheus.Re
432582
ingesterCfg.TSDBConfig.Backend = "s3"
433583
ingesterCfg.TSDBConfig.S3.Endpoint = "localhost"
434584

435-
ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil, registerer)
585+
ingester, err := NewV2(ingesterCfg, clientCfg, overrides, registerer)
436586
if err != nil {
437587
return nil, nil, err
438588
}

0 commit comments

Comments
 (0)