Skip to content

Composite chunk store #877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
unused-packages = true
go-tests = true

[[constraint]]
name = "github.com/weaveworks/common"
branch = "logging"
source = "github.com/tomwilkie/weaveworks-common"

[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "v1.10.8"
Expand Down
5 changes: 2 additions & 3 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ func main() {
},
}
alertmanagerConfig alertmanager.MultitenantAlertmanagerConfig
logLevel util.LogLevel
)
util.RegisterFlags(&serverConfig, &alertmanagerConfig, &logLevel)
util.RegisterFlags(&serverConfig, &alertmanagerConfig)
flag.Parse()

util.InitLogger(logLevel.AllowedLevel)
util.InitLogger(&serverConfig)

multiAM, err := alertmanager.NewMultitenantAlertmanager(&alertmanagerConfig)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions cmd/configs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ func main() {
},
}
dbConfig db.Config
logLevel util.LogLevel
)
util.RegisterFlags(&serverConfig, &dbConfig, &logLevel)
util.RegisterFlags(&serverConfig, &dbConfig)
flag.Parse()

util.InitLogger(logLevel.AllowedLevel)
util.InitLogger(&serverConfig)

db, err := db.New(dbConfig)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions cmd/distributor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@ func main() {
}
ringConfig ring.Config
distributorConfig distributor.Config
logLevel util.LogLevel
)
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &logLevel)
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig)
flag.Parse()

util.InitLogger(logLevel.AllowedLevel)
util.InitLogger(&serverConfig)

// Setting the environment variable JAEGER_AGENT_HOST enables tracing
trace := tracing.NewFromEnv("distributor")
Expand Down
7 changes: 3 additions & 4 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func main() {
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
ingesterConfig ingester.Config
logLevel util.LogLevel
eventSampleRate int
maxStreams uint
)
Expand All @@ -45,12 +44,12 @@ func main() {
// Ingester needs to know our gRPC listen port.
ingesterConfig.LifecyclerConfig.ListenPort = &serverConfig.GRPCListenPort
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &storageConfig,
&schemaConfig, &ingesterConfig, &logLevel)
&schemaConfig, &ingesterConfig)
flag.UintVar(&maxStreams, "ingester.max-concurrent-streams", 1000, "Limit on the number of concurrent streams for gRPC calls (0 = unlimited)")
flag.IntVar(&eventSampleRate, "event.sample-rate", 0, "How often to sample observability events (0 = never).")
flag.Parse()

util.InitLogger(logLevel.AllowedLevel)
util.InitLogger(&serverConfig)
util.InitEvents(eventSampleRate)

if maxStreams > 0 {
Expand All @@ -70,7 +69,7 @@ func main() {
os.Exit(1)
}

chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient)
chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient)
if err != nil {
level.Error(util.Logger).Log("err", err)
os.Exit(1)
Expand Down
13 changes: 5 additions & 8 deletions cmd/lite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/web/api/v1"
"github.com/prometheus/tsdb"
"google.golang.org/grpc"
Expand Down Expand Up @@ -48,14 +47,13 @@ func main() {
rulerConfig ruler.Config
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
logLevel util.LogLevel

unauthenticated bool
)
// Ingester needs to know our gRPC listen port.
ingesterConfig.LifecyclerConfig.ListenPort = &serverConfig.GRPCListenPort
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig, &querierConfig,
&ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
&ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig)
flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.")
flag.Parse()
ingesterConfig.SetClientConfig(distributorConfig.IngesterClientConfig)
Expand All @@ -64,7 +62,7 @@ func main() {
trace := tracing.NewFromEnv("ingester")
defer trace.Close()

util.InitLogger(logLevel.AllowedLevel)
util.InitLogger(&serverConfig)

server, err := server.New(serverConfig)
if err != nil {
Expand All @@ -79,7 +77,7 @@ func main() {
os.Exit(1)
}

chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient)
chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient)
if err != nil {
level.Error(util.Logger).Log("err", err)
os.Exit(1)
Expand Down Expand Up @@ -124,8 +122,7 @@ func main() {
tableManager.Start()
defer tableManager.Stop()

engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
queryable := querier.NewQueryable(dist, chunkStore)
queryable, engine := querier.Make(querierConfig, dist, chunkStore)

if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" {
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
Expand Down Expand Up @@ -186,7 +183,7 @@ func main() {

subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter))
subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
subrouter.Path("/read").Handler(activeMiddleware.Wrap(querier.RemoteReadHandler(queryable)))
subrouter.Path("/validate_expr").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
subrouter.Path("/user_stats").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.UserStatsHandler)))

Expand Down
14 changes: 5 additions & 9 deletions cmd/querier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/web/api/v1"
"github.com/prometheus/tsdb"

Expand Down Expand Up @@ -40,17 +39,16 @@ func main() {
chunkStoreConfig chunk.StoreConfig
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
logLevel util.LogLevel
)
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &querierConfig,
&chunkStoreConfig, &schemaConfig, &storageConfig, &logLevel)
&chunkStoreConfig, &schemaConfig, &storageConfig)
flag.Parse()

// Setting the environment variable JAEGER_AGENT_HOST enables tracing
trace := tracing.NewFromEnv("querier")
defer trace.Close()

util.InitLogger(logLevel.AllowedLevel)
util.InitLogger(&serverConfig)

r, err := ring.New(ringConfig)
if err != nil {
Expand Down Expand Up @@ -81,16 +79,14 @@ func main() {
os.Exit(1)
}

chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient)
chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient)
if err != nil {
level.Error(util.Logger).Log("err", err)
os.Exit(1)
}
defer chunkStore.Stop()

queryable := querier.NewQueryable(dist, chunkStore)

engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
queryable, engine := querier.Make(querierConfig, dist, chunkStore)
api := v1.NewAPI(
engine,
queryable,
Expand All @@ -107,7 +103,7 @@ func main() {

subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
subrouter.PathPrefix("/api/v1").Handler(middleware.AuthenticateUser.Wrap(promRouter))
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(querier.RemoteReadHandler(queryable)))
subrouter.Path("/validate_expr").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.UserStatsHandler)))

Expand Down
11 changes: 6 additions & 5 deletions cmd/ruler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,27 @@ func main() {
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
configStoreConfig ruler.ConfigStoreConfig
logLevel util.LogLevel
querierConfig querier.Config
)

// Setting the environment variable JAEGER_AGENT_HOST enables tracing
trace := tracing.NewFromEnv("ruler")
defer trace.Close()

util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel)
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig,
&querierConfig)
flag.Parse()

util.InitLogger(logLevel.AllowedLevel)
util.InitLogger(&serverConfig)

storageClient, err := storage.NewStorageClient(storageConfig, schemaConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing storage client", "err", err)
os.Exit(1)
}

chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient)
chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient)
if err != nil {
level.Error(util.Logger).Log("err", err)
os.Exit(1)
Expand All @@ -78,7 +79,7 @@ func main() {
prometheus.MustRegister(dist)

engine := promql.NewEngine(util.Logger, prometheus.DefaultRegisterer, rulerConfig.NumWorkers, rulerConfig.GroupTimeout)
queryable := querier.NewQueryable(dist, chunkStore)
queryable := querier.NewQueryable(dist, chunkStore, querierConfig.Iterators)

rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions cmd/table-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ func main() {
ingesterConfig ingester.Config
storageConfig storage.Config
schemaConfig chunk.SchemaConfig
logLevel util.LogLevel
)
util.RegisterFlags(&ingesterConfig, &serverConfig, &storageConfig, &schemaConfig, &logLevel)
util.RegisterFlags(&ingesterConfig, &serverConfig, &storageConfig, &schemaConfig)
flag.Parse()

util.InitLogger(logLevel.AllowedLevel)
util.InitLogger(&serverConfig)

if (schemaConfig.ChunkTables.WriteScale.Enabled ||
schemaConfig.IndexTables.WriteScale.Enabled ||
Expand Down
29 changes: 12 additions & 17 deletions pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,36 +319,31 @@ func equalByKey(a, b Chunk) bool {
a.From == b.From && a.Through == b.Through && a.Checksum == b.Checksum
}

func chunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) {
// ChunksToMatrix converts a set of chunks to a model.Matrix.
func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) {
sp, ctx := ot.StartSpanFromContext(ctx, "chunksToMatrix")
defer sp.Finish()
sp.LogFields(otlog.Int("chunks", len(chunks)))

// Group chunks by series, sort and dedupe samples.
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
metrics := map[model.Fingerprint]model.Metric{}
samples := map[model.Fingerprint][][]model.SamplePair{}
for _, c := range chunks {
ss, ok := sampleStreams[c.Fingerprint]
if !ok {
ss = &model.SampleStream{
Metric: c.Metric,
}
sampleStreams[c.Fingerprint] = ss
}

samples, err := c.Samples(from, through)
ss, err := c.Samples(from, through)
if err != nil {
return nil, err
}

ss.Values = util.MergeSampleSets(ss.Values, samples)
metrics[c.Fingerprint] = c.Metric
samples[c.Fingerprint] = append(samples[c.Fingerprint], ss)
}
sp.LogFields(otlog.Int("sample streams", len(sampleStreams)))
sp.LogFields(otlog.Int("sample streams", len(samples)))

matrix := make(model.Matrix, 0, len(sampleStreams))
for _, ss := range sampleStreams {
matrix := make(model.Matrix, 0, len(samples))
for fp, ss := range samples {
matrix = append(matrix, &model.SampleStream{
Metric: ss.Metric,
Values: ss.Values,
Metric: metrics[fp],
Values: util.MergeNSampleSets(ss...),
})
}

Expand Down
Loading