Skip to content

Commit 2d24435

Browse files
committed
Some bugfixes and change in the config
Signed-off-by: alanprot <alanprot@gmail.com>
1 parent 7c7e98f commit 2d24435

3 files changed

Lines changed: 45 additions & 53 deletions

File tree

pkg/parquetconverter/converter.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
3131
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
3232
"github.com/cortexproject/cortex/pkg/util"
33-
"github.com/cortexproject/cortex/pkg/util/flagext"
3433
util_log "github.com/cortexproject/cortex/pkg/util/log"
3534
"github.com/cortexproject/cortex/pkg/util/services"
3635
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -46,10 +45,8 @@ const (
4645
var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
4746

4847
type Config struct {
49-
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
50-
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
51-
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
52-
ConversionInterval time.Duration `yaml:"conversion_interval"`
48+
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
49+
ConversionInterval time.Duration `yaml:"conversion_interval"`
5350

5451
DataDir string `yaml:"data_dir"`
5552

@@ -64,8 +61,7 @@ type Converter struct {
6461
cfg Config
6562
storageCfg cortex_tsdb.BlocksStorageConfig
6663

67-
allowedTenants *util.AllowedTenants
68-
limits *validation.Overrides
64+
limits *validation.Overrides
6965

7066
// Ring used for sharding compactions.
7167
ringLifecycler *ring.Lifecycler
@@ -87,8 +83,6 @@ type Converter struct {
8783
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8884
cfg.Ring.RegisterFlags(f)
8985

90-
f.Var(&cfg.EnabledTenants, "parquet-converter.enabled-tenants", "Comma separated list of tenants that can be converted. If specified, only these tenants will be converted, otherwise all tenants can be converted.")
91-
f.Var(&cfg.DisabledTenants, "parquet-converter.disabled-tenants", "Comma separated list of tenants that cannot converted.")
9286
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
9387
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
9488
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
@@ -107,7 +101,6 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
107101
reg: registerer,
108102
storageCfg: storageCfg,
109103
logger: logger,
110-
allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),
111104
limits: limits,
112105
pool: chunkenc.NewPool(),
113106
blockRanges: blockRanges,
@@ -379,15 +372,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
379372
return nil
380373
}
381374

382-
func (c *Converter) ownUser(r ring.ReadRing, userID string) (bool, error) {
383-
if !c.allowedTenants.IsAllowed(userID) {
375+
func (c *Converter) ownUser(r ring.ReadRing, userId string) (bool, error) {
376+
if userId == util.GlobalMarkersDir {
377+
// __markers__ is reserved for global markers and no tenant should be allowed to have that name.
384378
return false, nil
385379
}
386-
387-
if c.limits.ParquetConverterTenantShardSize(userID) <= 0 {
388-
return true, nil
389-
}
390-
391380
rs, err := r.GetAllHealthy(RingOp)
392381
if err != nil {
393382
return false, err

pkg/querier/parquet_queryable.go

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@ package querier
22

33
import (
44
"context"
5-
"fmt"
65
"time"
76

87
"github.com/go-kit/log"
98
"github.com/pkg/errors"
9+
"github.com/prometheus-community/parquet-common/schema"
1010
"github.com/prometheus-community/parquet-common/search"
1111
parquet_storage "github.com/prometheus-community/parquet-common/storage"
1212
"github.com/prometheus/client_golang/prometheus"
13+
"github.com/prometheus/client_golang/prometheus/promauto"
1314
"github.com/prometheus/prometheus/model/labels"
1415
"github.com/prometheus/prometheus/storage"
16+
"github.com/prometheus/prometheus/tsdb/chunkenc"
1517
"github.com/prometheus/prometheus/util/annotations"
1618
"github.com/thanos-io/thanos/pkg/strutil"
1719

@@ -24,6 +26,19 @@ import (
2426
"github.com/cortexproject/cortex/pkg/util/services"
2527
)
2628

29+
type parquetQueryableFallbackMetrics struct {
30+
blocksQueriedTotal *prometheus.CounterVec
31+
}
32+
33+
func newParquetQueryableFallbackMetrics(reg prometheus.Registerer) *parquetQueryableFallbackMetrics {
34+
return &parquetQueryableFallbackMetrics{
35+
blocksQueriedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
36+
Name: "cortex_parquet_queryable_blocks_queried_total",
37+
Help: "Total number of blocks found to query.",
38+
}, []string{"type"}),
39+
}
40+
}
41+
2742
type parquetQueryableWithFallback struct {
2843
services.Service
2944

@@ -36,6 +51,9 @@ type parquetQueryableWithFallback struct {
3651
// Subservices manager.
3752
subservices *services.Manager
3853
subservicesWatcher *services.FailureWatcher
54+
55+
// metrics
56+
metrics *parquetQueryableFallbackMetrics
3957
}
4058

4159
func NewParquetQueryable(
@@ -51,39 +69,14 @@ func NewParquetQueryable(
5169
if err != nil {
5270
return nil, err
5371
}
54-
55-
// Create the blocks finder.
56-
var finder BlocksFinder
57-
if storageCfg.BucketStore.BucketIndex.Enabled {
58-
finder = NewBucketIndexBlocksFinder(BucketIndexBlocksFinderConfig{
59-
IndexLoader: bucketindex.LoaderConfig{
60-
CheckInterval: time.Minute,
61-
UpdateOnStaleInterval: storageCfg.BucketStore.SyncInterval,
62-
UpdateOnErrorInterval: storageCfg.BucketStore.BucketIndex.UpdateOnErrorInterval,
63-
IdleTimeout: storageCfg.BucketStore.BucketIndex.IdleTimeout,
64-
},
65-
MaxStalePeriod: storageCfg.BucketStore.BucketIndex.MaxStalePeriod,
66-
IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
67-
IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin,
68-
}, bucketClient, limits, logger, reg)
69-
} else {
70-
finder = NewBucketScanBlocksFinder(BucketScanBlocksFinderConfig{
71-
ScanInterval: storageCfg.BucketStore.SyncInterval,
72-
TenantsConcurrency: storageCfg.BucketStore.TenantSyncConcurrency,
73-
MetasConcurrency: storageCfg.BucketStore.MetaSyncConcurrency,
74-
CacheDir: storageCfg.BucketStore.SyncDir,
75-
IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay,
76-
IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin,
77-
BlockDiscoveryStrategy: storageCfg.BucketStore.BlockDiscoveryStrategy,
78-
}, bucketClient, limits, logger, reg)
79-
}
80-
81-
manager, err := services.NewManager(finder, blockStorageQueryable)
72+
manager, err := services.NewManager(blockStorageQueryable)
8273
if err != nil {
8374
return nil, err
8475
}
8576

86-
pq, err := search.NewParquetQueryable(nil, func(ctx context.Context, mint, maxt int64) ([]*parquet_storage.ParquetShard, error) {
77+
cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool())
78+
79+
parquetQueryable, err := search.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]*parquet_storage.ParquetShard, error) {
8780
userID, err := tenant.TenantID(ctx)
8881
if err != nil {
8982
return nil, err
@@ -98,8 +91,8 @@ func NewParquetQueryable(
9891
shards := make([]*parquet_storage.ParquetShard, 0, len(blocks))
9992

10093
for _, block := range blocks {
101-
blockName := fmt.Sprintf("%v/block", block.ID.String())
102-
shard, err := parquet_storage.OpenParquetShard(ctx, userBkt, blockName, 0)
94+
// we always only have 1 shard - shard 0
95+
shard, err := parquet_storage.OpenParquetShard(ctx, userBkt, block.ID.String(), 0)
10396
if err != nil {
10497
return nil, err
10598
}
@@ -112,15 +105,16 @@ func NewParquetQueryable(
112105
p := &parquetQueryableWithFallback{
113106
subservices: manager,
114107
blockStorageQueryable: blockStorageQueryable,
115-
parquetQueryable: pq,
108+
parquetQueryable: parquetQueryable,
116109
queryStoreAfter: config.QueryStoreAfter,
117110
subservicesWatcher: services.NewFailureWatcher(),
118-
finder: finder,
111+
finder: blockStorageQueryable.finder,
112+
metrics: newParquetQueryableFallbackMetrics(reg),
119113
}
120114

121115
p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
122116

123-
return pq, err
117+
return p, err
124118
}
125119

126120
func (p *parquetQueryableWithFallback) starting(ctx context.Context) error {
@@ -164,6 +158,7 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
164158
queryStoreAfter: p.queryStoreAfter,
165159
blocksStoreQuerier: bsq,
166160
finder: p.finder,
161+
metrics: p.metrics,
167162
}, nil
168163
}
169164

@@ -178,14 +173,16 @@ type parquetQuerier struct {
178173
// If set, the querier manipulates the max time to not be greater than
179174
// "now - queryStoreAfter" so that most recent blocks are not queried.
180175
queryStoreAfter time.Duration
176+
177+
// metrics
178+
metrics *parquetQueryableFallbackMetrics
181179
}
182180

183181
func (q *parquetQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
184182
remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT)
185183
if err != nil {
186184
return nil, nil, err
187185
}
188-
189186
limit := 0
190187

191188
if hints != nil {
@@ -342,5 +339,8 @@ func (q *parquetQuerier) getBlocks(ctx context.Context, minT, maxT int64) ([]*bu
342339
remaining = append(remaining, b)
343340
}
344341

342+
q.metrics.blocksQueriedTotal.WithLabelValues("parquet").Add(float64(len(parquetBlocks)))
343+
q.metrics.blocksQueriedTotal.WithLabelValues("tsdb").Add(float64(len(remaining)))
344+
345345
return remaining, parquetBlocks, nil
346346
}

pkg/querier/parquet_queryable_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
7575
finder: finder,
7676
blocksStoreQuerier: q,
7777
parquetQuerier: mParquetQuerier,
78+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
7879
}
7980

8081
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
@@ -131,6 +132,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
131132
finder: finder,
132133
blocksStoreQuerier: q,
133134
parquetQuerier: mParquetQuerier,
135+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
134136
}
135137

136138
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
@@ -193,6 +195,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
193195
finder: finder,
194196
blocksStoreQuerier: q,
195197
parquetQuerier: mParquetQuerier,
198+
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
196199
}
197200

198201
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{

0 commit comments

Comments
 (0)