Skip to content

Commit 2e731ce

Browse files
committed
Using another global prefix for the parquet markers
Signed-off-by: alanprot <[email protected]>
1 parent a81b78e commit 2e731ce

File tree

5 files changed

+37
-2
lines changed

5 files changed

+37
-2
lines changed

pkg/compactor/blocks_cleaner_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/rand"
66
"fmt"
7+
"github.com/cortexproject/cortex/pkg/storage/parquet"
78
"path"
89
"strings"
910
"testing"
@@ -161,6 +162,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
161162
require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-3", tsdb.NewTenantDeletionMark(time.Now())))
162163
block9 := createTSDBBlock(t, bucketClient, "user-3", 10, 30, nil)
163164
block10 := createTSDBBlock(t, bucketClient, "user-3", 30, 50, nil)
165+
createParquetMarker(t, bucketClient, "user-3", block10)
164166

165167
// User-4 with no more blocks, but couple of mark and debug files. Should be fully deleted.
166168
user4Mark := tsdb.NewTenantDeletionMark(time.Now())
@@ -174,6 +176,10 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
174176
block12 := createTSDBBlock(t, bucketClient, "user-5", 30, 50, nil)
175177
createNoCompactionMark(t, bucketClient, "user-5", block12)
176178

179+
// Create Parquet marker
180+
block13 := createTSDBBlock(t, bucketClient, "user-6", 30, 50, nil)
181+
createParquetMarker(t, bucketClient, "user-6", block13)
182+
177183
// The fixtures have been created. If the bucket client wasn't wrapped to write
178184
// deletion marks to the global location too, then this is the right time to do it.
179185
if options.markersMigrationEnabled {
@@ -230,7 +236,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
230236
{path: path.Join("user-3", block9.String(), "index"), expectedExists: false},
231237
{path: path.Join("user-3", block10.String(), metadata.MetaFilename), expectedExists: false},
232238
{path: path.Join("user-3", block10.String(), "index"), expectedExists: false},
239+
{path: path.Join("user-3", block10.String(), parquet.ConverterMarkerFileName), expectedExists: false},
233240
{path: path.Join("user-4", block.DebugMetas, "meta.json"), expectedExists: options.user4FilesExist},
241+
{path: path.Join("user-6", block13.String(), parquet.ConverterMarkerFileName), expectedExists: true},
234242
} {
235243
exists, err := bucketClient.Exists(ctx, tc.path)
236244
require.NoError(t, err)
@@ -298,21 +306,25 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
298306
cortex_bucket_blocks_count{user="user-1"} 2
299307
cortex_bucket_blocks_count{user="user-2"} 1
300308
cortex_bucket_blocks_count{user="user-5"} 2
309+
cortex_bucket_blocks_count{user="user-6"} 1
301310
# HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket.
302311
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
303312
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 1
304313
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
305314
cortex_bucket_blocks_marked_for_deletion_count{user="user-5"} 0
315+
cortex_bucket_blocks_marked_for_deletion_count{user="user-6"} 0
306316
# HELP cortex_bucket_blocks_marked_for_no_compaction_count Total number of blocks marked for no compaction in the bucket.
307317
# TYPE cortex_bucket_blocks_marked_for_no_compaction_count gauge
308318
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-1"} 0
309319
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-2"} 0
310320
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-5"} 1
321+
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-6"} 0
311322
# HELP cortex_bucket_blocks_partials_count Total number of partial blocks.
312323
# TYPE cortex_bucket_blocks_partials_count gauge
313324
cortex_bucket_blocks_partials_count{user="user-1"} 2
314325
cortex_bucket_blocks_partials_count{user="user-2"} 0
315326
cortex_bucket_blocks_partials_count{user="user-5"} 0
327+
cortex_bucket_blocks_partials_count{user="user-6"} 0
316328
`),
317329
"cortex_bucket_blocks_count",
318330
"cortex_bucket_blocks_marked_for_deletion_count",

pkg/compactor/compactor_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"encoding/json"
88
"flag"
99
"fmt"
10+
"github.com/cortexproject/cortex/pkg/storage/parquet"
1011
"io"
1112
"os"
1213
"path"
@@ -1429,6 +1430,14 @@ func createBlockVisitMarker(t *testing.T, bkt objstore.Bucket, userID string, bl
14291430
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
14301431
}
14311432

1433+
func createParquetMarker(t *testing.T, bkt objstore.Bucket, userID string, blockID ulid.ULID) {
1434+
content := mockParquetMarker()
1435+
blockPath := path.Join(userID, blockID.String())
1436+
markPath := path.Join(blockPath, parquet.ConverterMarkerFileName)
1437+
1438+
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
1439+
}
1440+
14321441
func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) {
14331442
var compactor *Compactor
14341443
var log *concurrency.SyncBuffer
@@ -1727,6 +1736,19 @@ func mockBlockVisitMarker() string {
17271736
return string(content)
17281737
}
17291738

1739+
func mockParquetMarker() string {
1740+
parquetMarker := parquet.ConverterMark{
1741+
Version: 1,
1742+
}
1743+
1744+
content, err := json.Marshal(parquetMarker)
1745+
if err != nil {
1746+
panic("failed to marshal mocked block visit marker")
1747+
}
1748+
1749+
return string(content)
1750+
}
1751+
17301752
func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
17311753
numUsers := 10
17321754

pkg/parquetconverter/converter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestConverter(t *testing.T) {
9696
for _, block := range blocksConverted {
9797
for _, file := range []string{
9898
fmt.Sprintf("%s/parquet-converter-mark.json", block.String()),
99-
fmt.Sprintf("markers/%s-parquet-converter-mark.json", block.String()),
99+
fmt.Sprintf("parquet-markers/%s-parquet-converter-mark.json", block.String()),
100100
fmt.Sprintf("%s/0.chunks.parquet", block.String()),
101101
fmt.Sprintf("%s/0.labels.parquet", block.String()),
102102
} {

pkg/storage/parquet/converter_marker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
)
1818

1919
const (
20+
ConverterMarkerPrefix = "parquet-markers"
2021
ConverterMarkerFileName = "parquet-converter-mark.json"
2122
CurrentVersion = 1
2223
)

pkg/storage/tsdb/bucketindex/markers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func NoCompactMarkFilenameMarkFilepath(blockID ulid.ULID) string {
4343
}
4444

4545
func ConverterMarkFilePath(blockID ulid.ULID) string {
46-
return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), parquet.ConverterMarkerFileName)
46+
return fmt.Sprintf("%s/%s-%s", parquet.ConverterMarkerPrefix, blockID.String(), parquet.ConverterMarkerFileName)
4747
}
4848

4949
// IsBlockDeletionMarkFilename returns whether the input filename matches the expected pattern

0 commit comments

Comments
 (0)