Skip to content

Commit 427679e

Browse files
authored
Fix updateCachedShippedBlocks - new thanos (#4806)
Signed-off-by: Alan Protasio <[email protected]>
1 parent 49ce5ab commit 427679e

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

pkg/ingester/ingester_v2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
310310
// updateCachedShipperBlocks reads the shipper meta file and updates the cached shipped blocks.
311311
func (u *userTSDB) updateCachedShippedBlocks() error {
312312
shipperMeta, err := shipper.ReadMetaFile(u.db.Dir())
313-
if os.IsNotExist(err) {
313+
if os.IsNotExist(err) || os.IsNotExist(errors.Cause(err)) {
314314
// If the meta file doesn't exist it means the shipper hasn't run yet.
315315
shipperMeta = &shipper.Meta{}
316316
} else if err != nil {

pkg/ingester/ingester_v2_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2475,6 +2475,45 @@ func TestIngester_seriesCountIsCorrectAfterClosingTSDBForDeletedTenant(t *testin
24752475
require.Equal(t, int64(0), i.TSDBState.seriesCount.Load())
24762476
}
24772477

2478+
func TestIngester_sholdUpdateCacheShippedBlocks(t *testing.T) {
2479+
ctx := context.Background()
2480+
cfg := defaultIngesterTestConfig(t)
2481+
cfg.LifecyclerConfig.JoinAfter = 0
2482+
cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 2
2483+
2484+
// Create ingester
2485+
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil)
2486+
require.NoError(t, err)
2487+
2488+
require.NoError(t, services.StartAndAwaitRunning(ctx, i))
2489+
defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck
2490+
2491+
// Wait until it's ACTIVE
2492+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
2493+
return i.lifecycler.GetState()
2494+
})
2495+
2496+
mockUserShipper(t, i)
2497+
2498+
// Mock the shipper meta (no blocks).
2499+
db := i.getTSDB(userID)
2500+
err = db.updateCachedShippedBlocks()
2501+
require.NoError(t, err)
2502+
2503+
require.Equal(t, len(db.getCachedShippedBlocks()), 0)
2504+
shippedBlock, _ := ulid.Parse("01D78XZ44G0000000000000000")
2505+
2506+
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{
2507+
Version: shipper.MetaVersion1,
2508+
Uploaded: []ulid.ULID{shippedBlock},
2509+
}))
2510+
2511+
err = db.updateCachedShippedBlocks()
2512+
require.NoError(t, err)
2513+
2514+
require.Equal(t, len(db.getCachedShippedBlocks()), 1)
2515+
}
2516+
24782517
func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInProgress(t *testing.T) {
24792518
ctx := context.Background()
24802519
cfg := defaultIngesterTestConfig(t)

0 commit comments

Comments
 (0)