Skip to content

Commit d7aa495

Browse files
authored
Update prometheus (#802)
* Update prometheus to latest and fix up for API changes * Create querier.Config * Use selectParams in Select() to tell if we need samples or metadata - Move from one brittle assumption about Prometheus to another. * Refactor: remove NewEngine() helper
1 parent ae03d21 commit d7aa495

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+1082
-634
lines changed

.circleci/config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ version: 2
33
# https://circleci.com/blog/circleci-hacks-reuse-yaml-in-your-circleci-config-with-yaml/
44
defaults: &defaults
55
docker:
6-
- image: weaveworks/cortex-build-image:circleci-2.0-3d3c3d3a
6+
- image: weaveworks/cortex-build-image:master-af2a0261
77
working_directory: /go/src/github.com/weaveworks/cortex
88

99
workflows:
@@ -47,7 +47,7 @@ jobs:
4747
4848
integration:
4949
docker:
50-
- image: weaveworks/cortex-build-image:circleci-2.0-3d3c3d3a
50+
- image: weaveworks/cortex-build-image:master-af2a0261
5151
- image: circleci/postgres:9.6.2-alpine
5252
environment:
5353
POSTGRES_DB: configs_test

Gopkg.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
name = "github.com/prometheus/prometheus"
2020
branch = "master"
2121

22+
# Similar to prometheus/prometheus, we want to track upstream master.
23+
[[constraint]]
24+
name = "github.com/prometheus/tsdb"
25+
branch = "master"
26+
2227
[[override]]
2328
name = "k8s.io/client-go"
2429
revision = "3627aeb7d4f6ade38f995d2c923e459146493c7e"

cmd/lite/main.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func main() {
4242

4343
chunkStoreConfig chunk.StoreConfig
4444
distributorConfig distributor.Config
45+
querierConfig querier.Config
4546
ingesterConfig ingester.Config
4647
configStoreConfig ruler.ConfigStoreConfig
4748
rulerConfig ruler.Config
@@ -53,7 +54,7 @@ func main() {
5354
)
5455
// Ingester needs to know our gRPC listen port.
5556
ingesterConfig.ListenPort = &serverConfig.GRPCListenPort
56-
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig,
57+
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig, &querierConfig,
5758
&ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
5859
flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.")
5960
flag.Parse()
@@ -123,13 +124,16 @@ func main() {
123124
tableManager.Start()
124125
defer tableManager.Stop()
125126

127+
engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
128+
queryable := querier.NewQueryable(dist, chunkStore)
129+
126130
if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" {
127131
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
128132
if err != nil {
129133
level.Error(util.Logger).Log("msg", "error initializing ruler config store", "err", err)
130134
os.Exit(1)
131135
}
132-
rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore)
136+
rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist)
133137
if err != nil {
134138
level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err)
135139
os.Exit(1)
@@ -144,16 +148,13 @@ func main() {
144148
defer rulerServer.Stop()
145149
}
146150

147-
sampleQueryable := querier.NewQueryable(dist, chunkStore, false)
148-
metadataQueryable := querier.NewQueryable(dist, chunkStore, true)
149-
150-
engine := promql.NewEngine(sampleQueryable, nil)
151151
api := v1.NewAPI(
152152
engine,
153-
metadataQueryable,
153+
queryable,
154154
querier.DummyTargetRetriever{},
155155
querier.DummyAlertmanagerRetriever{},
156156
func() config.Config { return config.Config{} },
157+
map[string]string{}, // TODO: include configuration flags
157158
func(f http.HandlerFunc) http.HandlerFunc { return f },
158159
func() *tsdb.DB { return nil }, // Only needed for admin APIs.
159160
false, // Disable admin APIs.
@@ -185,7 +186,7 @@ func main() {
185186

186187
subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
187188
subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter))
188-
subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(sampleQueryable.RemoteReadHandler)))
189+
subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
189190
subrouter.Path("/validate_expr").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
190191
subrouter.Path("/user_stats").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.UserStatsHandler)))
191192

cmd/querier/main.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ func main() {
3636
}
3737
ringConfig ring.Config
3838
distributorConfig distributor.Config
39+
querierConfig querier.Config
3940
chunkStoreConfig chunk.StoreConfig
4041
schemaConfig chunk.SchemaConfig
4142
storageConfig storage.Config
4243
logLevel util.LogLevel
4344
)
44-
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
45+
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &querierConfig,
4546
&chunkStoreConfig, &schemaConfig, &storageConfig, &logLevel)
4647
flag.Parse()
4748

@@ -87,16 +88,16 @@ func main() {
8788
}
8889
defer chunkStore.Stop()
8990

90-
sampleQueryable := querier.NewQueryable(dist, chunkStore, false)
91-
metadataQueryable := querier.NewQueryable(dist, chunkStore, true)
91+
queryable := querier.NewQueryable(dist, chunkStore)
9292

93-
engine := promql.NewEngine(sampleQueryable, nil)
93+
engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
9494
api := v1.NewAPI(
9595
engine,
96-
metadataQueryable,
96+
queryable,
9797
querier.DummyTargetRetriever{},
9898
querier.DummyAlertmanagerRetriever{},
9999
func() config.Config { return config.Config{} },
100+
map[string]string{}, // TODO: include configuration flags
100101
func(f http.HandlerFunc) http.HandlerFunc { return f },
101102
func() *tsdb.DB { return nil }, // Only needed for admin APIs.
102103
false, // Disable admin APIs.
@@ -106,7 +107,7 @@ func main() {
106107

107108
subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
108109
subrouter.PathPrefix("/api/v1").Handler(middleware.AuthenticateUser.Wrap(promRouter))
109-
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(sampleQueryable.RemoteReadHandler)))
110+
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
110111
subrouter.Path("/validate_expr").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
111112
subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.UserStatsHandler)))
112113

cmd/ruler/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/go-kit/kit/log/level"
88
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/prometheus/promql"
910
"google.golang.org/grpc"
1011

1112
"github.com/weaveworks/common/middleware"
@@ -14,6 +15,7 @@ import (
1415
"github.com/weaveworks/cortex/pkg/chunk"
1516
"github.com/weaveworks/cortex/pkg/chunk/storage"
1617
"github.com/weaveworks/cortex/pkg/distributor"
18+
"github.com/weaveworks/cortex/pkg/querier"
1719
"github.com/weaveworks/cortex/pkg/ring"
1820
"github.com/weaveworks/cortex/pkg/ruler"
1921
"github.com/weaveworks/cortex/pkg/util"
@@ -75,7 +77,10 @@ func main() {
7577
defer dist.Stop()
7678
prometheus.MustRegister(dist)
7779

78-
rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore)
80+
engine := promql.NewEngine(util.Logger, prometheus.DefaultRegisterer, rulerConfig.NumWorkers, rulerConfig.GroupTimeout)
81+
queryable := querier.NewQueryable(dist, chunkStore)
82+
83+
rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist)
7984
if err != nil {
8085
level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err)
8186
os.Exit(1)

pkg/querier/dummy.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,14 @@ type DummyTargetRetriever struct{}
1212
// Targets implements TargetRetriever.
1313
func (r DummyTargetRetriever) Targets() []*scrape.Target { return nil }
1414

15+
// DroppedTargets implements TargetRetriever.
16+
func (r DummyTargetRetriever) DroppedTargets() []*scrape.Target { return nil }
17+
1518
// DummyAlertmanagerRetriever implements AlertmanagerRetriever.
1619
type DummyAlertmanagerRetriever struct{}
1720

1821
// Alertmanagers implements AlertmanagerRetriever.
1922
func (r DummyAlertmanagerRetriever) Alertmanagers() []*url.URL { return nil }
23+
24+
// DroppedAlertmanagers implements AlertmanagerRetriever.
25+
func (r DummyAlertmanagerRetriever) DroppedAlertmanagers() []*url.URL { return nil }

pkg/querier/querier.go

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,40 +2,48 @@ package querier
22

33
import (
44
"context"
5+
"flag"
56
"net/http"
7+
"time"
68

79
"github.com/go-kit/kit/log/level"
810
"github.com/prometheus/common/model"
911
"github.com/prometheus/prometheus/pkg/labels"
1012
"github.com/prometheus/prometheus/promql"
1113
"github.com/prometheus/prometheus/storage"
14+
1215
"github.com/weaveworks/cortex/pkg/ingester/client"
1316
"github.com/weaveworks/cortex/pkg/prom1/storage/metric"
1417

1518
"github.com/weaveworks/cortex/pkg/util"
1619
)
1720

21+
// Config contains the configuration require to create a querier
22+
type Config struct {
23+
MaxConcurrent int
24+
Timeout time.Duration
25+
}
26+
27+
// RegisterFlags adds the flags required to config this to the given FlagSet
28+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
29+
flag.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
30+
flag.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.")
31+
}
32+
1833
// ChunkStore is the interface we need to get chunks
1934
type ChunkStore interface {
2035
Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) (model.Matrix, error)
2136
}
2237

23-
// NewEngine creates a new promql.Engine for cortex.
24-
func NewEngine(distributor Querier, chunkStore ChunkStore) *promql.Engine {
25-
queryable := NewQueryable(distributor, chunkStore, false)
26-
return promql.NewEngine(queryable, nil)
27-
}
28-
2938
// NewQueryable creates a new Queryable for cortex.
30-
func NewQueryable(distributor Querier, chunkStore ChunkStore, mo bool) MergeQueryable {
39+
func NewQueryable(distributor Querier, chunkStore ChunkStore) MergeQueryable {
3140
return MergeQueryable{
3241
queriers: []Querier{
3342
distributor,
3443
&chunkQuerier{
3544
store: chunkStore,
3645
},
3746
},
38-
metadataOnly: mo,
3947
}
4048
}
4149

@@ -108,18 +116,16 @@ func mergeMatrices(matrices chan model.Matrix, errors chan error, n int) (model.
108116
// A MergeQueryable is a storage.Queryable that produces a storage.Querier which merges
109117
// results from multiple underlying Queriers.
110118
type MergeQueryable struct {
111-
queriers []Querier
112-
metadataOnly bool
119+
queriers []Querier
113120
}
114121

115122
// Querier implements storage.Queryable.
116123
func (q MergeQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
117124
return mergeQuerier{
118-
ctx: ctx,
119-
queriers: q.queriers,
120-
mint: mint,
121-
maxt: maxt,
122-
metadataOnly: q.metadataOnly,
125+
ctx: ctx,
126+
queriers: q.queriers,
127+
mint: mint,
128+
maxt: maxt,
123129
}, nil
124130
}
125131

@@ -188,16 +194,12 @@ type mergeQuerier struct {
188194
queriers []Querier
189195
mint int64
190196
maxt int64
191-
// Whether this querier should only load series metadata in Select().
192-
// Necessary for remote storage implementations of the storage.Querier
193-
// interface because both metadata and bulk data loading happens via
194-
// the Select() method.
195-
metadataOnly bool
196197
}
197198

198-
func (mq mergeQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
199+
func (mq mergeQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, error) {
199200
// TODO: Update underlying selectors to return errors directly.
200-
if mq.metadataOnly {
201+
// Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation, which needs only metadata
202+
if sp == nil {
201203
return mq.selectMetadata(matchers...), nil
202204
}
203205
return mq.selectSamples(matchers...), nil

pkg/querier/querier_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/golang/snappy"
1313
"github.com/prometheus/common/model"
1414
"github.com/prometheus/prometheus/pkg/labels"
15+
"github.com/prometheus/prometheus/storage"
1516
"github.com/stretchr/testify/require"
1617
"github.com/weaveworks/cortex/pkg/ingester/client"
1718
"github.com/weaveworks/cortex/pkg/prom1/storage/metric"
@@ -104,13 +105,13 @@ func TestMergeQuerierSortsMetricLabels(t *testing.T) {
104105
},
105106
},
106107
},
107-
mint: 0,
108-
maxt: 0,
109-
metadataOnly: false,
108+
mint: 0,
109+
maxt: 0,
110110
}
111111
m, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "testmetric")
112112
require.NoError(t, err)
113-
ss, err := mq.Select(m)
113+
dummyParams := storage.SelectParams{}
114+
ss, err := mq.Select(&dummyParams, m)
114115
require.NoError(t, err)
115116
require.NoError(t, ss.Err())
116117
ss.Next()

pkg/ruler/ruler.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@ import (
2626
"github.com/prometheus/prometheus/notifier"
2727
"github.com/prometheus/prometheus/promql"
2828
"github.com/prometheus/prometheus/rules"
29+
"github.com/prometheus/prometheus/storage"
2930
"github.com/prometheus/prometheus/util/strutil"
3031
"golang.org/x/net/context"
3132
"golang.org/x/net/context/ctxhttp"
3233

3334
"github.com/weaveworks/common/instrument"
3435
"github.com/weaveworks/common/user"
35-
"github.com/weaveworks/cortex/pkg/chunk"
3636
"github.com/weaveworks/cortex/pkg/distributor"
37-
"github.com/weaveworks/cortex/pkg/querier"
3837
"github.com/weaveworks/cortex/pkg/util"
3938
)
4039

@@ -118,6 +117,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
118117
// Ruler evaluates rules.
119118
type Ruler struct {
120119
engine *promql.Engine
120+
queryable storage.Queryable
121121
pusher Pusher
122122
alertURL *url.URL
123123
notifierCfg *config.Config
@@ -191,13 +191,14 @@ func (rn *rulerNotifier) stop() {
191191
}
192192

193193
// NewRuler creates a new ruler from a distributor and chunk store.
194-
func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) (*Ruler, error) {
194+
func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d *distributor.Distributor) (*Ruler, error) {
195195
ncfg, err := buildNotifierConfig(&cfg)
196196
if err != nil {
197197
return nil, err
198198
}
199199
return &Ruler{
200-
engine: querier.NewEngine(d, c),
200+
engine: engine,
201+
queryable: queryable,
201202
pusher: d,
202203
alertURL: cfg.ExternalURL.URL,
203204
notifierCfg: ncfg,
@@ -278,7 +279,7 @@ func (r *Ruler) newGroup(ctx context.Context, userID string, item *workItem) (*r
278279
}
279280
opts := &rules.ManagerOptions{
280281
Appendable: appendable,
281-
QueryFunc: rules.EngineQueryFunc(r.engine),
282+
QueryFunc: rules.EngineQueryFunc(r.engine, r.queryable),
282283
Context: ctx,
283284
ExternalURL: r.alertURL,
284285
NotifyFunc: sendAlerts(notifier, r.alertURL.String()),

0 commit comments

Comments
 (0)