Skip to content

Commit dda18ca

Browse files
committed
Make Cortex work with updated Prometheus packages
The most noticeable change is around the lifecycle manager of the Alertmanager notifier and its associated service discovery manager. This has now been moved from the notifier package into main.go in Prometheus and thus we also need to duplicate that lifecycle management ourselves.
1 parent def3489 commit dda18ca

File tree

7 files changed

+142
-34
lines changed

7 files changed

+142
-34
lines changed

Gopkg.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@
2222
name = "github.com/Azure/azure-sdk-for-go"
2323
revision = "bd73d950fa4440dae889bd9917bff7cef539f86e"
2424

25-
#[[override]]
26-
# name = "github.com/prometheus/common"
27-
# revision = "1bab55dd05dbff384524a6a1c99006d9eb5f139b"
28-
2925
[[override]]
3026
name = "github.com/weaveworks/mesh"
3127
revision = "5015f896ab62d3e9fe757456c757521ce0c3faff"

cmd/lite/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prometheus/prometheus/config"
1414
"github.com/prometheus/prometheus/promql"
1515
"github.com/prometheus/prometheus/web/api/v1"
16+
"github.com/prometheus/tsdb"
1617
"google.golang.org/grpc"
1718

1819
"github.com/weaveworks/common/middleware"
@@ -155,6 +156,8 @@ func main() {
155156
querier.DummyAlertmanagerRetriever{},
156157
func() config.Config { return config.Config{} },
157158
func(f http.HandlerFunc) http.HandlerFunc { return f },
159+
func() *tsdb.DB { return nil }, // Only needed for admin APIs.
160+
false, // Disable admin APIs.
158161
)
159162
promRouter := route.New().WithPrefix("/api/prom/api/v1")
160163
api.Register(promRouter)

cmd/querier/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prometheus/prometheus/config"
1414
"github.com/prometheus/prometheus/promql"
1515
"github.com/prometheus/prometheus/web/api/v1"
16+
"github.com/prometheus/tsdb"
1617

1718
"github.com/weaveworks/common/middleware"
1819
"github.com/weaveworks/common/server"
@@ -98,6 +99,8 @@ func main() {
9899
querier.DummyAlertmanagerRetriever{},
99100
func() config.Config { return config.Config{} },
100101
func(f http.HandlerFunc) http.HandlerFunc { return f },
102+
func() *tsdb.DB { return nil }, // Only needed for admin APIs.
103+
false, // Disable admin APIs.
101104
)
102105
promRouter := route.New().WithPrefix("/api/prom/api/v1")
103106
api.Register(promRouter)

pkg/querier/querier.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,27 +195,28 @@ type mergeQuerier struct {
195195
metadataOnly bool
196196
}
197197

198-
func (mq mergeQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
198+
func (mq mergeQuerier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error) {
199+
// TODO: Update underlying selectors to return errors directly.
199200
if mq.metadataOnly {
200-
return mq.selectMetadata(matchers...)
201+
return mq.selectMetadata(matchers...), nil
201202
}
202-
return mq.selectSamples(matchers...)
203+
return mq.selectSamples(matchers...), nil
203204
}
204205

205206
func (mq mergeQuerier) selectMetadata(matchers ...*labels.Matcher) storage.SeriesSet {
206207
// NB that we don't do this in parallel, as in practice we only have two queriers,
207208
// one of which is the chunk store, which doesn't implement this yet.
208-
var set storage.SeriesSet
209+
seriesSets := make([]storage.SeriesSet, 0, len(mq.queriers))
209210
for _, q := range mq.queriers {
210211
ms, err := q.MetricsForLabelMatchers(mq.ctx, model.Time(mq.mint), model.Time(mq.maxt), matchers...)
211212
if err != nil {
212213
return errSeriesSet{err: err}
213214
}
214215
ss := metricsToSeriesSet(ms)
215-
set = storage.DeduplicateSeriesSet(set, ss)
216+
seriesSets = append(seriesSets, ss)
216217
}
217218

218-
return set
219+
return storage.NewMergeSeriesSet(seriesSets)
219220
}
220221

221222
func (mq mergeQuerier) selectSamples(matchers ...*labels.Matcher) storage.SeriesSet {

pkg/querier/querier_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ func TestMergeQuerierSortsMetricLabels(t *testing.T) {
110110
}
111111
m, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "testmetric")
112112
require.NoError(t, err)
113-
ss := mq.Select(m)
113+
ss, err := mq.Select(m)
114+
require.NoError(t, err)
114115
require.NoError(t, ss.Err())
115116
ss.Next()
116117
require.NoError(t, ss.Err())

pkg/ruler/ruler.go

Lines changed: 124 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package ruler
22

33
import (
4+
native_ctx "context"
5+
"crypto/md5"
6+
"encoding/json"
47
"flag"
58
"fmt"
69
"net/http"
@@ -12,11 +15,17 @@ import (
1215
gklog "github.com/go-kit/kit/log"
1316
"github.com/go-kit/kit/log/level"
1417
"github.com/prometheus/client_golang/prometheus"
18+
config_util "github.com/prometheus/common/config"
1519
"github.com/prometheus/common/model"
1620
"github.com/prometheus/prometheus/config"
21+
"github.com/prometheus/prometheus/discovery"
22+
sd_config "github.com/prometheus/prometheus/discovery/config"
23+
"github.com/prometheus/prometheus/discovery/dns"
24+
"github.com/prometheus/prometheus/discovery/targetgroup"
1725
"github.com/prometheus/prometheus/notifier"
1826
"github.com/prometheus/prometheus/promql"
1927
"github.com/prometheus/prometheus/rules"
28+
"github.com/prometheus/prometheus/util/strutil"
2029
"golang.org/x/net/context"
2130
"golang.org/x/net/context/ctxhttp"
2231

@@ -97,7 +106,67 @@ type Ruler struct {
97106

98107
// Per-user notifiers with separate queues.
99108
notifiersMtx sync.Mutex
100-
notifiers map[string]*notifier.Notifier
109+
notifiers map[string]*rulerNotifier
110+
}
111+
112+
type rulerNotifier struct {
113+
notifier *notifier.Notifier
114+
sdCtx context.Context
115+
sdCancel context.CancelFunc
116+
sdManager *discovery.Manager
117+
wg sync.WaitGroup
118+
logger gklog.Logger
119+
}
120+
121+
func newRulerNotifier(o *notifier.Options, l gklog.Logger) *rulerNotifier {
122+
ctx, cancel := context.WithCancel(context.Background())
123+
return &rulerNotifier{
124+
notifier: notifier.New(o, l),
125+
sdCtx: ctx,
126+
sdCancel: cancel,
127+
sdManager: discovery.NewManager(l),
128+
logger: l,
129+
}
130+
}
131+
132+
func (rn *rulerNotifier) run() {
133+
rn.wg.Add(2)
134+
go func() {
135+
if err := rn.sdManager.Run(rn.sdCtx); err != nil {
136+
level.Error(rn.logger).Log("msg", "error starting notifier discovery manager", "err", err)
137+
}
138+
rn.wg.Done()
139+
}()
140+
go func() {
141+
rn.notifier.Run(rn.sdManager.SyncCh())
142+
rn.wg.Done()
143+
}()
144+
}
145+
146+
func (rn *rulerNotifier) applyConfig(cfg *config.Config) error {
147+
if err := rn.notifier.ApplyConfig(cfg); err != nil {
148+
return err
149+
}
150+
151+
sdCfgs := make(map[string]sd_config.ServiceDiscoveryConfig)
152+
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
153+
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
154+
b, err := json.Marshal(v)
155+
if err != nil {
156+
return err
157+
}
158+
// This hash needs to be identical to the one computed in the notifier in
159+
// https://github.com/prometheus/prometheus/blob/719c579f7b917b384c3d629752dea026513317dc/notifier/notifier.go#L265
160+
// This kind of sucks, but it's done in Prometheus in main.go in the same way.
161+
sdCfgs[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
162+
}
163+
return rn.sdManager.ApplyConfig(sdCfgs)
164+
}
165+
166+
func (rn *rulerNotifier) stop() {
167+
rn.sdCancel()
168+
rn.notifier.Stop()
169+
rn.wg.Wait()
101170
}
102171

103172
// NewRuler creates a new ruler from a distributor and chunk store.
@@ -112,7 +181,7 @@ func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) (*Ruler, e
112181
alertURL: cfg.ExternalURL.URL,
113182
notifierCfg: ncfg,
114183
queueCapacity: cfg.NotificationQueueCapacity,
115-
notifiers: map[string]*notifier.Notifier{},
184+
notifiers: map[string]*rulerNotifier{},
116185
}, nil
117186
}
118187

@@ -124,23 +193,23 @@ func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
124193
}
125194

126195
u := rulerConfig.AlertmanagerURL
127-
var sdConfig config.ServiceDiscoveryConfig
196+
var sdConfig sd_config.ServiceDiscoveryConfig
128197
if rulerConfig.AlertmanagerDiscovery {
129198
if !strings.Contains(u.Host, "_tcp.") {
130199
return nil, fmt.Errorf("When alertmanager-discovery is on, host name must be of the form _portname._tcp.service.fqdn (is %q)", u.Host)
131200
}
132-
dnsSDConfig := config.DNSSDConfig{
201+
dnsSDConfig := dns.SDConfig{
133202
Names: []string{u.Host},
134203
RefreshInterval: model.Duration(rulerConfig.AlertmanagerRefreshInterval),
135204
Type: "SRV",
136205
Port: 0, // Ignored, because of SRV.
137206
}
138-
sdConfig = config.ServiceDiscoveryConfig{
139-
DNSSDConfigs: []*config.DNSSDConfig{&dnsSDConfig},
207+
sdConfig = sd_config.ServiceDiscoveryConfig{
208+
DNSSDConfigs: []*dns.SDConfig{&dnsSDConfig},
140209
}
141210
} else {
142-
sdConfig = config.ServiceDiscoveryConfig{
143-
StaticConfigs: []*config.TargetGroup{
211+
sdConfig = sd_config.ServiceDiscoveryConfig{
212+
StaticConfigs: []*targetgroup.Group{
144213
{
145214
Targets: []model.LabelSet{
146215
{
@@ -165,14 +234,14 @@ func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
165234
}
166235

167236
if u.User != nil {
168-
amConfig.HTTPClientConfig = config.HTTPClientConfig{
169-
BasicAuth: &config.BasicAuth{
237+
amConfig.HTTPClientConfig = config_util.HTTPClientConfig{
238+
BasicAuth: &config_util.BasicAuth{
170239
Username: u.User.Username(),
171240
},
172241
}
173242

174243
if password, isSet := u.User.Password(); isSet {
175-
amConfig.HTTPClientConfig.BasicAuth.Password = config.Secret(password)
244+
amConfig.HTTPClientConfig.BasicAuth.Password = config_util.Secret(password)
176245
}
177246
}
178247

@@ -191,26 +260,59 @@ func (r *Ruler) newGroup(ctx context.Context, rs []rules.Rule) (*rules.Group, er
191260
}
192261
opts := &rules.ManagerOptions{
193262
Appendable: appendable,
194-
QueryEngine: r.engine,
263+
QueryFunc: rules.EngineQueryFunc(r.engine),
195264
Context: ctx,
196265
ExternalURL: r.alertURL,
197-
Notifier: notifier,
266+
NotifyFunc: sendAlerts(notifier, r.alertURL.String()),
198267
Logger: gklog.NewNopLogger(),
268+
Registerer: prometheus.DefaultRegisterer,
199269
}
200270
delay := 0 * time.Second // Unused, so 0 value is fine.
201271
return rules.NewGroup("default", "none", delay, rs, opts), nil
202272
}
203273

274+
// sendAlerts implements a the rules.NotifyFunc for a Notifier.
275+
// It filters any non-firing alerts from the input.
276+
//
277+
// Copied from Prometheus's main.go.
278+
func sendAlerts(n *notifier.Notifier, externalURL string) rules.NotifyFunc {
279+
return func(ctx native_ctx.Context, expr string, alerts ...*rules.Alert) error {
280+
var res []*notifier.Alert
281+
282+
for _, alert := range alerts {
283+
// Only send actually firing alerts.
284+
if alert.State == rules.StatePending {
285+
continue
286+
}
287+
a := &notifier.Alert{
288+
StartsAt: alert.FiredAt,
289+
Labels: alert.Labels,
290+
Annotations: alert.Annotations,
291+
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
292+
}
293+
if !alert.ResolvedAt.IsZero() {
294+
a.EndsAt = alert.ResolvedAt
295+
}
296+
res = append(res, a)
297+
}
298+
299+
if len(alerts) > 0 {
300+
n.Send(res...)
301+
}
302+
return nil
303+
}
304+
}
305+
204306
func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Notifier, error) {
205307
r.notifiersMtx.Lock()
206308
defer r.notifiersMtx.Unlock()
207309

208310
n, ok := r.notifiers[userID]
209311
if ok {
210-
return n, nil
312+
return n.notifier, nil
211313
}
212314

213-
n = notifier.New(&notifier.Options{
315+
n = newRulerNotifier(&notifier.Options{
214316
QueueCapacity: r.queueCapacity,
215317
Do: func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
216318
// Note: The passed-in context comes from the Prometheus rule group code
@@ -222,17 +324,18 @@ func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Notifier, error) {
222324
}
223325
return ctxhttp.Do(ctx, client, req)
224326
},
225-
}, gklog.NewNopLogger())
327+
}, util.Logger)
328+
329+
go n.run()
226330

227331
// This should never fail, unless there's a programming mistake.
228-
if err := n.ApplyConfig(r.notifierCfg); err != nil {
332+
if err := n.applyConfig(r.notifierCfg); err != nil {
229333
return nil, err
230334
}
231-
go n.Run()
232335

233336
// TODO: Remove notifiers for stale users. Right now this is a slow leak.
234337
r.notifiers[userID] = n
235-
return n, nil
338+
return n.notifier, nil
236339
}
237340

238341
// Evaluate a list of rules in the given context.
@@ -245,7 +348,7 @@ func (r *Ruler) Evaluate(ctx context.Context, rs []rules.Rule) {
245348
level.Error(logger).Log("msg", "failed to create rule group", "err", err)
246349
return
247350
}
248-
g.Eval(start)
351+
g.Eval(ctx, start)
249352

250353
// The prometheus routines we're calling have their own instrumentation
251354
// but, a) it's rule-based, not group-based, b) it's a summary, not a
@@ -260,7 +363,7 @@ func (r *Ruler) Stop() {
260363
defer r.notifiersMtx.Unlock()
261364

262365
for _, n := range r.notifiers {
263-
n.Stop()
366+
n.stop()
264367
}
265368
}
266369

pkg/ruler/ruler_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,13 @@ func TestNotifierSendsUserIDHeader(t *testing.T) {
4848
if err != nil {
4949
t.Fatal(err)
5050
}
51-
defer n.Stop()
51+
for _, not := range r.notifiers {
52+
defer not.stop()
53+
}
5254
// Loop until notifier discovery syncs up
5355
for len(n.Alertmanagers()) == 0 {
5456
time.Sleep(10 * time.Millisecond)
5557
}
56-
5758
n.Send(&notifier.Alert{
5859
Labels: labels.Labels{labels.Label{Name: "alertname", Value: "testalert"}},
5960
})

0 commit comments

Comments
 (0)