Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
name = "github.com/prometheus/prometheus"
branch = "master"

[[constraint]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we want to have comments for newly added constraints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good spot

name = "github.com/prometheus/tsdb"
branch = "master"

[[override]]
name = "k8s.io/client-go"
revision = "3627aeb7d4f6ade38f995d2c923e459146493c7e"
Expand Down
15 changes: 8 additions & 7 deletions cmd/lite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/web/api/v1"
"github.com/prometheus/tsdb"
"google.golang.org/grpc"
Expand Down Expand Up @@ -42,6 +41,7 @@ func main() {

chunkStoreConfig chunk.StoreConfig
distributorConfig distributor.Config
querierConfig querier.Config
ingesterConfig ingester.Config
configStoreConfig ruler.ConfigStoreConfig
rulerConfig ruler.Config
Expand All @@ -53,7 +53,7 @@ func main() {
)
// Ingester needs to know our gRPC listen port.
ingesterConfig.ListenPort = &serverConfig.GRPCListenPort
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig,
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig, &querierConfig,
&ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.")
flag.Parse()
Expand Down Expand Up @@ -123,13 +123,15 @@ func main() {
tableManager.Start()
defer tableManager.Stop()

engine, queryable := querier.NewEngine(dist, chunkStore, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)

if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" {
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler config store", "err", err)
os.Exit(1)
}
rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore)
rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err)
os.Exit(1)
Expand All @@ -144,16 +146,15 @@ func main() {
defer rulerServer.Stop()
}

sampleQueryable := querier.NewQueryable(dist, chunkStore, false)
metadataQueryable := querier.NewQueryable(dist, chunkStore, true)
sampleQueryable := querier.NewQueryable(dist, chunkStore)

engine := promql.NewEngine(sampleQueryable, nil)
api := v1.NewAPI(
engine,
metadataQueryable,
sampleQueryable,
querier.DummyTargetRetriever{},
querier.DummyAlertmanagerRetriever{},
func() config.Config { return config.Config{} },
map[string]string{}, // TODO: include configuration flags
func(f http.HandlerFunc) http.HandlerFunc { return f },
func() *tsdb.DB { return nil }, // Only needed for admin APIs.
false, // Disable admin APIs.
Expand Down
13 changes: 7 additions & 6 deletions cmd/querier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func main() {
}
ringConfig ring.Config
distributorConfig distributor.Config
querierConfig querier.Config
chunkStoreConfig chunk.StoreConfig
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
logLevel util.LogLevel
)
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &querierConfig,
&chunkStoreConfig, &schemaConfig, &storageConfig, &logLevel)
flag.Parse()

Expand Down Expand Up @@ -87,16 +88,16 @@ func main() {
}
defer chunkStore.Stop()

sampleQueryable := querier.NewQueryable(dist, chunkStore, false)
metadataQueryable := querier.NewQueryable(dist, chunkStore, true)
queryable := querier.NewQueryable(dist, chunkStore)

engine := promql.NewEngine(sampleQueryable, nil)
engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
api := v1.NewAPI(
engine,
metadataQueryable,
queryable,
querier.DummyTargetRetriever{},
querier.DummyAlertmanagerRetriever{},
func() config.Config { return config.Config{} },
map[string]string{}, // TODO: include configuration flags
func(f http.HandlerFunc) http.HandlerFunc { return f },
func() *tsdb.DB { return nil }, // Only needed for admin APIs.
false, // Disable admin APIs.
Expand All @@ -106,7 +107,7 @@ func main() {

subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
subrouter.PathPrefix("/api/v1").Handler(middleware.AuthenticateUser.Wrap(promRouter))
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(sampleQueryable.RemoteReadHandler)))
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
subrouter.Path("/validate_expr").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.UserStatsHandler)))

Expand Down
7 changes: 5 additions & 2 deletions cmd/ruler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/storage"
"github.com/weaveworks/cortex/pkg/distributor"
"github.com/weaveworks/cortex/pkg/querier"
"github.com/weaveworks/cortex/pkg/ring"
"github.com/weaveworks/cortex/pkg/ruler"
"github.com/weaveworks/cortex/pkg/util"
Expand All @@ -29,6 +30,7 @@ func main() {
}
ringConfig ring.Config
distributorConfig distributor.Config
querierConfig querier.Config
rulerConfig ruler.Config
chunkStoreConfig chunk.StoreConfig
schemaConfig chunk.SchemaConfig
Expand All @@ -41,7 +43,7 @@ func main() {
trace := tracing.NewFromEnv("ruler")
defer trace.Close()

util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &querierConfig,
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel)
flag.Parse()

Expand Down Expand Up @@ -75,7 +77,8 @@ func main() {
defer dist.Stop()
prometheus.MustRegister(dist)

rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore)
engine, queryable := querier.NewEngine(dist, chunkStore, prometheus.DefaultRegisterer, querierConfig.MaxConcurrent, querierConfig.Timeout)
rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err)
os.Exit(1)
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ type DummyTargetRetriever struct{}
// Targets implements TargetRetriever.
func (r DummyTargetRetriever) Targets() []*scrape.Target { return nil }

// DroppedTargets implements TargetRetriever.
func (r DummyTargetRetriever) DroppedTargets() []*scrape.Target { return nil }

// DummyAlertmanagerRetriever implements AlertmanagerRetriever.
type DummyAlertmanagerRetriever struct{}

// Alertmanagers implements AlertmanagerRetriever.
func (r DummyAlertmanagerRetriever) Alertmanagers() []*url.URL { return nil }

// DroppedAlertmanagers implements AlertmanagerRetriever.
func (r DummyAlertmanagerRetriever) DroppedAlertmanagers() []*url.URL { return nil }
48 changes: 29 additions & 19 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,56 @@ package querier

import (
"context"
"flag"
"net/http"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"

"github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/prom1/storage/metric"

"github.com/weaveworks/cortex/pkg/util"
)

// Config contains the configuration require to create a querier
type Config struct {
MaxConcurrent int
Timeout time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
flag.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.")
}

// ChunkStore is the interface we need to get chunks
type ChunkStore interface {
Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) (model.Matrix, error)
}

// NewEngine creates a new promql.Engine for cortex.
func NewEngine(distributor Querier, chunkStore ChunkStore) *promql.Engine {
queryable := NewQueryable(distributor, chunkStore, false)
return promql.NewEngine(queryable, nil)
func NewEngine(distributor Querier, chunkStore ChunkStore, reg prometheus.Registerer, maxConcurrent int, timeout time.Duration) (*promql.Engine, storage.Queryable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having NewEngine() return both an engine and a queryable seems messy, or at least not appropriate naming anymore. Maybe just get rid of this tiny wrapper and have the caller call both NewQueryable() and promql.NewEngine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed; I've done this.

queryable := NewQueryable(distributor, chunkStore)
engine := promql.NewEngine(util.Logger, reg, maxConcurrent, timeout)
return engine, queryable
}

// NewQueryable creates a new Queryable for cortex.
func NewQueryable(distributor Querier, chunkStore ChunkStore, mo bool) MergeQueryable {
func NewQueryable(distributor Querier, chunkStore ChunkStore) MergeQueryable {
return MergeQueryable{
queriers: []Querier{
distributor,
&chunkQuerier{
store: chunkStore,
},
},
metadataOnly: mo,
}
}

Expand Down Expand Up @@ -108,18 +124,16 @@ func mergeMatrices(matrices chan model.Matrix, errors chan error, n int) (model.
// A MergeQueryable is a storage.Queryable that produces a storage.Querier which merges
// results from multiple underlying Queriers.
type MergeQueryable struct {
queriers []Querier
metadataOnly bool
queriers []Querier
}

// Querier implements storage.Queryable.
func (q MergeQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return mergeQuerier{
ctx: ctx,
queriers: q.queriers,
mint: mint,
maxt: maxt,
metadataOnly: q.metadataOnly,
ctx: ctx,
queriers: q.queriers,
mint: mint,
maxt: maxt,
}, nil
}

Expand Down Expand Up @@ -188,16 +202,12 @@ type mergeQuerier struct {
queriers []Querier
mint int64
maxt int64
// Whether this querier should only load series metadata in Select().
// Necessary for remote storage implementations of the storage.Querier
// interface because both metadata and bulk data loading happens via
// the Select() method.
metadataOnly bool
}

func (mq mergeQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
func (mq mergeQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) {
// TODO: Update underlying selectors to return errors directly.
if mq.metadataOnly {
// Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation, which needs only metadata
if sp == nil {
return mq.selectMetadata(matchers...), nil
}
return mq.selectSamples(matchers...), nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
"github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/prom1/storage/metric"
Expand Down Expand Up @@ -104,13 +105,13 @@ func TestMergeQuerierSortsMetricLabels(t *testing.T) {
},
},
},
mint: 0,
maxt: 0,
metadataOnly: false,
mint: 0,
maxt: 0,
}
m, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "testmetric")
require.NoError(t, err)
ss, err := mq.Select(m)
dummyParams := storage.SelectParams{}
ss, err := mq.Select(&dummyParams, m)
require.NoError(t, err)
require.NoError(t, ss.Err())
ss.Next()
Expand Down
11 changes: 6 additions & 5 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ import (
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"

"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/distributor"
"github.com/weaveworks/cortex/pkg/querier"
"github.com/weaveworks/cortex/pkg/util"
)

Expand Down Expand Up @@ -118,6 +117,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// Ruler evaluates rules.
type Ruler struct {
engine *promql.Engine
queryable storage.Queryable
pusher Pusher
alertURL *url.URL
notifierCfg *config.Config
Expand Down Expand Up @@ -191,13 +191,14 @@ func (rn *rulerNotifier) stop() {
}

// NewRuler creates a new ruler from a distributor and chunk store.
func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) (*Ruler, error) {
func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d *distributor.Distributor) (*Ruler, error) {
ncfg, err := buildNotifierConfig(&cfg)
if err != nil {
return nil, err
}
return &Ruler{
engine: querier.NewEngine(d, c),
engine: engine,
queryable: queryable,
pusher: d,
alertURL: cfg.ExternalURL.URL,
notifierCfg: ncfg,
Expand Down Expand Up @@ -278,7 +279,7 @@ func (r *Ruler) newGroup(ctx context.Context, userID string, item *workItem) (*r
}
opts := &rules.ManagerOptions{
Appendable: appendable,
QueryFunc: rules.EngineQueryFunc(r.engine),
QueryFunc: rules.EngineQueryFunc(r.engine, r.queryable),
Context: ctx,
ExternalURL: r.alertURL,
NotifyFunc: sendAlerts(notifier, r.alertURL.String()),
Expand Down
Loading