Skip to content

Commit f72e3d3

Browse files
committed
Using another global prefix for the parquet markers
Signed-off-by: alanprot <[email protected]>
1 parent a81b78e commit f72e3d3

File tree

5 files changed

+38
-3
lines changed

5 files changed

+38
-3
lines changed

pkg/compactor/blocks_cleaner_test.go

+12
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/thanos-io/thanos/pkg/block/metadata"
2222

2323
"github.com/cortexproject/cortex/pkg/storage/bucket"
24+
"github.com/cortexproject/cortex/pkg/storage/parquet"
2425
"github.com/cortexproject/cortex/pkg/storage/tsdb"
2526
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2627
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
@@ -161,6 +162,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
161162
require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-3", tsdb.NewTenantDeletionMark(time.Now())))
162163
block9 := createTSDBBlock(t, bucketClient, "user-3", 10, 30, nil)
163164
block10 := createTSDBBlock(t, bucketClient, "user-3", 30, 50, nil)
165+
createParquetMarker(t, bucketClient, "user-3", block10)
164166

165167
// User-4 with no more blocks, but couple of mark and debug files. Should be fully deleted.
166168
user4Mark := tsdb.NewTenantDeletionMark(time.Now())
@@ -174,6 +176,10 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
174176
block12 := createTSDBBlock(t, bucketClient, "user-5", 30, 50, nil)
175177
createNoCompactionMark(t, bucketClient, "user-5", block12)
176178

179+
// Create Parquet marker
180+
block13 := createTSDBBlock(t, bucketClient, "user-6", 30, 50, nil)
181+
createParquetMarker(t, bucketClient, "user-6", block13)
182+
177183
// The fixtures have been created. If the bucket client wasn't wrapped to write
178184
// deletion marks to the global location too, then this is the right time to do it.
179185
if options.markersMigrationEnabled {
@@ -230,7 +236,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
230236
{path: path.Join("user-3", block9.String(), "index"), expectedExists: false},
231237
{path: path.Join("user-3", block10.String(), metadata.MetaFilename), expectedExists: false},
232238
{path: path.Join("user-3", block10.String(), "index"), expectedExists: false},
239+
{path: path.Join("user-3", block10.String(), parquet.ConverterMarkerFileName), expectedExists: false},
233240
{path: path.Join("user-4", block.DebugMetas, "meta.json"), expectedExists: options.user4FilesExist},
241+
{path: path.Join("user-6", block13.String(), parquet.ConverterMarkerFileName), expectedExists: true},
234242
} {
235243
exists, err := bucketClient.Exists(ctx, tc.path)
236244
require.NoError(t, err)
@@ -298,21 +306,25 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
298306
cortex_bucket_blocks_count{user="user-1"} 2
299307
cortex_bucket_blocks_count{user="user-2"} 1
300308
cortex_bucket_blocks_count{user="user-5"} 2
309+
cortex_bucket_blocks_count{user="user-6"} 1
301310
# HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket.
302311
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
303312
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 1
304313
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
305314
cortex_bucket_blocks_marked_for_deletion_count{user="user-5"} 0
315+
cortex_bucket_blocks_marked_for_deletion_count{user="user-6"} 0
306316
# HELP cortex_bucket_blocks_marked_for_no_compaction_count Total number of blocks marked for no compaction in the bucket.
307317
# TYPE cortex_bucket_blocks_marked_for_no_compaction_count gauge
308318
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-1"} 0
309319
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-2"} 0
310320
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-5"} 1
321+
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-6"} 0
311322
# HELP cortex_bucket_blocks_partials_count Total number of partial blocks.
312323
# TYPE cortex_bucket_blocks_partials_count gauge
313324
cortex_bucket_blocks_partials_count{user="user-1"} 2
314325
cortex_bucket_blocks_partials_count{user="user-2"} 0
315326
cortex_bucket_blocks_partials_count{user="user-5"} 0
327+
cortex_bucket_blocks_partials_count{user="user-6"} 0
316328
`),
317329
"cortex_bucket_blocks_count",
318330
"cortex_bucket_blocks_marked_for_deletion_count",

pkg/compactor/compactor_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cortexproject/cortex/pkg/ring"
3737
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
3838
"github.com/cortexproject/cortex/pkg/storage/bucket"
39+
"github.com/cortexproject/cortex/pkg/storage/parquet"
3940
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
4041
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
4142
cortex_storage_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
@@ -1429,6 +1430,14 @@ func createBlockVisitMarker(t *testing.T, bkt objstore.Bucket, userID string, bl
14291430
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
14301431
}
14311432

1433+
func createParquetMarker(t *testing.T, bkt objstore.Bucket, userID string, blockID ulid.ULID) {
1434+
content := mockParquetMarker()
1435+
blockPath := path.Join(userID, blockID.String())
1436+
markPath := path.Join(blockPath, parquet.ConverterMarkerFileName)
1437+
1438+
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
1439+
}
1440+
14321441
func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) {
14331442
var compactor *Compactor
14341443
var log *concurrency.SyncBuffer
@@ -1727,6 +1736,19 @@ func mockBlockVisitMarker() string {
17271736
return string(content)
17281737
}
17291738

1739+
func mockParquetMarker() string {
1740+
parquetMarker := parquet.ConverterMark{
1741+
Version: 1,
1742+
}
1743+
1744+
content, err := json.Marshal(parquetMarker)
1745+
if err != nil {
1746+
panic("failed to marshal mocked block visit marker")
1747+
}
1748+
1749+
return string(content)
1750+
}
1751+
17301752
func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
17311753
numUsers := 10
17321754

pkg/parquetconverter/converter_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestConverter(t *testing.T) {
9696
for _, block := range blocksConverted {
9797
for _, file := range []string{
9898
fmt.Sprintf("%s/parquet-converter-mark.json", block.String()),
99-
fmt.Sprintf("markers/%s-parquet-converter-mark.json", block.String()),
99+
fmt.Sprintf("parquet-markers/%s-parquet-converter-mark.json", block.String()),
100100
fmt.Sprintf("%s/0.chunks.parquet", block.String()),
101101
fmt.Sprintf("%s/0.labels.parquet", block.String()),
102102
} {

pkg/storage/parquet/converter_marker.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
)
1818

1919
const (
20+
ConverterMarkerPrefix = "parquet-markers"
2021
ConverterMarkerFileName = "parquet-converter-mark.json"
2122
CurrentVersion = 1
2223
)
@@ -29,7 +30,7 @@ func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Instr
2930
markerPath := path.Join(id.String(), ConverterMarkerFileName)
3031
reader, err := userBkt.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(userBkt.IsAccessDeniedErr, userBkt.IsObjNotFoundErr)).Get(ctx, markerPath)
3132
if err != nil {
32-
if userBkt.IsObjNotFoundErr(err) {
33+
if userBkt.IsObjNotFoundErr(err) || userBkt.IsAccessDeniedErr(err) {
3334
return &ConverterMark{}, nil
3435
}
3536

pkg/storage/tsdb/bucketindex/markers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func NoCompactMarkFilenameMarkFilepath(blockID ulid.ULID) string {
4343
}
4444

4545
func ConverterMarkFilePath(blockID ulid.ULID) string {
46-
return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), parquet.ConverterMarkerFileName)
46+
return fmt.Sprintf("%s/%s-%s", parquet.ConverterMarkerPrefix, blockID.String(), parquet.ConverterMarkerFileName)
4747
}
4848

4949
// IsBlockDeletionMarkFilename returns whether the input filename matches the expected pattern

0 commit comments

Comments
 (0)