Skip to content

Update prometheus #802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: 2
# https://circleci.com/blog/circleci-hacks-reuse-yaml-in-your-circleci-config-with-yaml/
defaults: &defaults
docker:
- image: weaveworks/cortex-build-image:circleci-2.0-3d3c3d3a
- image: weaveworks/cortex-build-image:master-af2a0261
working_directory: /go/src/github.com/weaveworks/cortex

workflows:
Expand Down Expand Up @@ -44,7 +44,7 @@ jobs:

integration:
docker:
- image: weaveworks/cortex-build-image:circleci-2.0-3d3c3d3a
- image: weaveworks/cortex-build-image:master-af2a0261
- image: circleci/postgres:9.6.2-alpine
environment:
POSTGRES_DB: configs_test
Expand Down
12 changes: 6 additions & 6 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
name = "github.com/prometheus/prometheus"
branch = "master"

# Similar to prometheus/prometheus, we want to track upstream master.
[[constraint]]
name = "github.com/prometheus/tsdb"
branch = "master"

[[override]]
name = "k8s.io/client-go"
revision = "3627aeb7d4f6ade38f995d2c923e459146493c7e"
Expand Down
17 changes: 9 additions & 8 deletions cmd/lite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func main() {

chunkStoreConfig chunk.StoreConfig
distributorConfig distributor.Config
querierConfig querier.Config
ingesterConfig ingester.Config
configStoreConfig ruler.ConfigStoreConfig
rulerConfig ruler.Config
Expand All @@ -53,7 +54,7 @@ func main() {
)
// Ingester needs to know our gRPC listen port.
ingesterConfig.ListenPort = &serverConfig.GRPCListenPort
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig,
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig, &querierConfig,
&ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.")
flag.Parse()
Expand Down Expand Up @@ -123,13 +124,16 @@ func main() {
tableManager.Start()
defer tableManager.Stop()

engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
queryable := querier.NewQueryable(dist, chunkStore)

if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" {
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler config store", "err", err)
os.Exit(1)
}
rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore)
rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err)
os.Exit(1)
Expand All @@ -144,16 +148,13 @@ func main() {
defer rulerServer.Stop()
}

sampleQueryable := querier.NewQueryable(dist, chunkStore, false)
metadataQueryable := querier.NewQueryable(dist, chunkStore, true)

engine := promql.NewEngine(sampleQueryable, nil)
api := v1.NewAPI(
engine,
metadataQueryable,
queryable,
querier.DummyTargetRetriever{},
querier.DummyAlertmanagerRetriever{},
func() config.Config { return config.Config{} },
map[string]string{}, // TODO: include configuration flags
func(f http.HandlerFunc) http.HandlerFunc { return f },
func() *tsdb.DB { return nil }, // Only needed for admin APIs.
false, // Disable admin APIs.
Expand Down Expand Up @@ -185,7 +186,7 @@ func main() {

subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter))
subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(sampleQueryable.RemoteReadHandler)))
subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
subrouter.Path("/validate_expr").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
subrouter.Path("/user_stats").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.UserStatsHandler)))

Expand Down
13 changes: 7 additions & 6 deletions cmd/querier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func main() {
}
ringConfig ring.Config
distributorConfig distributor.Config
querierConfig querier.Config
chunkStoreConfig chunk.StoreConfig
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
logLevel util.LogLevel
)
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &querierConfig,
&chunkStoreConfig, &schemaConfig, &storageConfig, &logLevel)
flag.Parse()

Expand Down Expand Up @@ -87,16 +88,16 @@ func main() {
}
defer chunkStore.Stop()

sampleQueryable := querier.NewQueryable(dist, chunkStore, false)
metadataQueryable := querier.NewQueryable(dist, chunkStore, true)
queryable := querier.NewQueryable(dist, chunkStore)

engine := promql.NewEngine(sampleQueryable, nil)
engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
api := v1.NewAPI(
engine,
metadataQueryable,
queryable,
querier.DummyTargetRetriever{},
querier.DummyAlertmanagerRetriever{},
func() config.Config { return config.Config{} },
map[string]string{}, // TODO: include configuration flags
func(f http.HandlerFunc) http.HandlerFunc { return f },
func() *tsdb.DB { return nil }, // Only needed for admin APIs.
false, // Disable admin APIs.
Expand All @@ -106,7 +107,7 @@ func main() {

subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
subrouter.PathPrefix("/api/v1").Handler(middleware.AuthenticateUser.Wrap(promRouter))
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(sampleQueryable.RemoteReadHandler)))
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
subrouter.Path("/validate_expr").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.UserStatsHandler)))

Expand Down
7 changes: 6 additions & 1 deletion cmd/ruler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
"google.golang.org/grpc"

"github.com/weaveworks/common/middleware"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/storage"
"github.com/weaveworks/cortex/pkg/distributor"
"github.com/weaveworks/cortex/pkg/querier"
"github.com/weaveworks/cortex/pkg/ring"
"github.com/weaveworks/cortex/pkg/ruler"
"github.com/weaveworks/cortex/pkg/util"
Expand Down Expand Up @@ -75,7 +77,10 @@ func main() {
defer dist.Stop()
prometheus.MustRegister(dist)

rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore)
engine := promql.NewEngine(util.Logger, prometheus.DefaultRegisterer, rulerConfig.NumWorkers, rulerConfig.GroupTimeout)
queryable := querier.NewQueryable(dist, chunkStore)

rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err)
os.Exit(1)
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ type DummyTargetRetriever struct{}
// Targets implements TargetRetriever.
func (r DummyTargetRetriever) Targets() []*scrape.Target { return nil }

// DroppedTargets implements TargetRetriever.
func (r DummyTargetRetriever) DroppedTargets() []*scrape.Target { return nil }

// DummyAlertmanagerRetriever implements AlertmanagerRetriever.
type DummyAlertmanagerRetriever struct{}

// Alertmanagers implements AlertmanagerRetriever.
func (r DummyAlertmanagerRetriever) Alertmanagers() []*url.URL { return nil }

// DroppedAlertmanagers implements AlertmanagerRetriever.
func (r DummyAlertmanagerRetriever) DroppedAlertmanagers() []*url.URL { return nil }
46 changes: 24 additions & 22 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,48 @@ package querier

import (
"context"
"flag"
"net/http"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"

"github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/prom1/storage/metric"

"github.com/weaveworks/cortex/pkg/util"
)

// Config contains the configuration require to create a querier
type Config struct {
MaxConcurrent int
Timeout time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
flag.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.")
}

// ChunkStore is the interface we need to get chunks
type ChunkStore interface {
Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) (model.Matrix, error)
}

// NewEngine creates a new promql.Engine for cortex.
func NewEngine(distributor Querier, chunkStore ChunkStore) *promql.Engine {
queryable := NewQueryable(distributor, chunkStore, false)
return promql.NewEngine(queryable, nil)
}

// NewQueryable creates a new Queryable for cortex.
func NewQueryable(distributor Querier, chunkStore ChunkStore, mo bool) MergeQueryable {
func NewQueryable(distributor Querier, chunkStore ChunkStore) MergeQueryable {
return MergeQueryable{
queriers: []Querier{
distributor,
&chunkQuerier{
store: chunkStore,
},
},
metadataOnly: mo,
}
}

Expand Down Expand Up @@ -108,18 +116,16 @@ func mergeMatrices(matrices chan model.Matrix, errors chan error, n int) (model.
// A MergeQueryable is a storage.Queryable that produces a storage.Querier which merges
// results from multiple underlying Queriers.
type MergeQueryable struct {
queriers []Querier
metadataOnly bool
queriers []Querier
}

// Querier implements storage.Queryable.
func (q MergeQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return mergeQuerier{
ctx: ctx,
queriers: q.queriers,
mint: mint,
maxt: maxt,
metadataOnly: q.metadataOnly,
ctx: ctx,
queriers: q.queriers,
mint: mint,
maxt: maxt,
}, nil
}

Expand Down Expand Up @@ -188,16 +194,12 @@ type mergeQuerier struct {
queriers []Querier
mint int64
maxt int64
// Whether this querier should only load series metadata in Select().
// Necessary for remote storage implementations of the storage.Querier
// interface because both metadata and bulk data loading happens via
// the Select() method.
metadataOnly bool
}

func (mq mergeQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
func (mq mergeQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) {
// TODO: Update underlying selectors to return errors directly.
if mq.metadataOnly {
// Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation, which needs only metadata
if sp == nil {
return mq.selectMetadata(matchers...), nil
}
return mq.selectSamples(matchers...), nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
"github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/prom1/storage/metric"
Expand Down Expand Up @@ -104,13 +105,13 @@ func TestMergeQuerierSortsMetricLabels(t *testing.T) {
},
},
},
mint: 0,
maxt: 0,
metadataOnly: false,
mint: 0,
maxt: 0,
}
m, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "testmetric")
require.NoError(t, err)
ss, err := mq.Select(m)
dummyParams := storage.SelectParams{}
ss, err := mq.Select(&dummyParams, m)
require.NoError(t, err)
require.NoError(t, ss.Err())
ss.Next()
Expand Down
11 changes: 6 additions & 5 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ import (
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/strutil"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"

"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/distributor"
"github.com/weaveworks/cortex/pkg/querier"
"github.com/weaveworks/cortex/pkg/util"
)

Expand Down Expand Up @@ -118,6 +117,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// Ruler evaluates rules.
type Ruler struct {
engine *promql.Engine
queryable storage.Queryable
pusher Pusher
alertURL *url.URL
notifierCfg *config.Config
Expand Down Expand Up @@ -191,13 +191,14 @@ func (rn *rulerNotifier) stop() {
}

// NewRuler creates a new ruler from a distributor and chunk store.
func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) (*Ruler, error) {
func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d *distributor.Distributor) (*Ruler, error) {
ncfg, err := buildNotifierConfig(&cfg)
if err != nil {
return nil, err
}
return &Ruler{
engine: querier.NewEngine(d, c),
engine: engine,
queryable: queryable,
pusher: d,
alertURL: cfg.ExternalURL.URL,
notifierCfg: ncfg,
Expand Down Expand Up @@ -278,7 +279,7 @@ func (r *Ruler) newGroup(ctx context.Context, userID string, item *workItem) (*r
}
opts := &rules.ManagerOptions{
Appendable: appendable,
QueryFunc: rules.EngineQueryFunc(r.engine),
QueryFunc: rules.EngineQueryFunc(r.engine, r.queryable),
Context: ctx,
ExternalURL: r.alertURL,
NotifyFunc: sendAlerts(notifier, r.alertURL.String()),
Expand Down
Loading