1
1
package ruler
2
2
3
3
import (
4
+ native_ctx "context"
4
5
"flag"
5
6
"fmt"
6
7
"net/http"
@@ -12,11 +13,17 @@ import (
12
13
gklog "github.com/go-kit/kit/log"
13
14
"github.com/go-kit/kit/log/level"
14
15
"github.com/prometheus/client_golang/prometheus"
16
+ config_util "github.com/prometheus/common/config"
15
17
"github.com/prometheus/common/model"
16
18
"github.com/prometheus/prometheus/config"
19
+ "github.com/prometheus/prometheus/discovery"
20
+ sd_config "github.com/prometheus/prometheus/discovery/config"
21
+ "github.com/prometheus/prometheus/discovery/dns"
22
+ "github.com/prometheus/prometheus/discovery/targetgroup"
17
23
"github.com/prometheus/prometheus/notifier"
18
24
"github.com/prometheus/prometheus/promql"
19
25
"github.com/prometheus/prometheus/rules"
26
+ "github.com/prometheus/prometheus/util/strutil"
20
27
"golang.org/x/net/context"
21
28
"golang.org/x/net/context/ctxhttp"
22
29
@@ -97,7 +104,59 @@ type Ruler struct {
97
104
98
105
// Per-user notifiers with separate queues.
99
106
notifiersMtx sync.Mutex
100
- notifiers map [string ]* notifier.Notifier
107
+ notifiers map [string ]* rulerNotifier
108
+ }
109
+
110
+ type rulerNotifier struct {
111
+ notifier * notifier.Notifier
112
+ sdCtx context.Context
113
+ sdCancel context.CancelFunc
114
+ sdManager * discovery.Manager
115
+ wg sync.WaitGroup
116
+ logger gklog.Logger
117
+ }
118
+
119
+ func newRulerNotifier (o * notifier.Options , l gklog.Logger ) * rulerNotifier {
120
+ ctx , cancel := context .WithCancel (context .Background ())
121
+ return & rulerNotifier {
122
+ notifier : notifier .New (o , l ),
123
+ sdCtx : ctx ,
124
+ sdCancel : cancel ,
125
+ sdManager : discovery .NewManager (l ),
126
+ logger : l ,
127
+ }
128
+ }
129
+
130
+ func (rn * rulerNotifier ) run () {
131
+ rn .wg .Add (2 )
132
+ go func () {
133
+ if err := rn .sdManager .Run (rn .sdCtx ); err != nil {
134
+ level .Error (rn .logger ).Log ("msg" , "error starting notifier discovery manager" , "err" , err )
135
+ }
136
+ }()
137
+ go func () {
138
+ rn .notifier .Run (rn .sdManager .SyncCh ())
139
+ }()
140
+ }
141
+
142
+ func (rn * rulerNotifier ) applyConfig (cfg * config.Config ) error {
143
+ if err := rn .notifier .ApplyConfig (cfg ); err != nil {
144
+ return err
145
+ }
146
+
147
+ amConfigs := cfg .AlertingConfig .AlertmanagerConfigs
148
+ if len (amConfigs ) != 1 {
149
+ return fmt .Errorf ("ruler alerting config should have exactly one AlertmanagerConfig" )
150
+ }
151
+ return rn .sdManager .ApplyConfig (
152
+ map [string ]sd_config.ServiceDiscoveryConfig {"ruler" : amConfigs [0 ].ServiceDiscoveryConfig },
153
+ )
154
+ }
155
+
156
+ func (rn * rulerNotifier ) stop () {
157
+ rn .sdCancel ()
158
+ rn .notifier .Stop ()
159
+ rn .wg .Wait ()
101
160
}
102
161
103
162
// NewRuler creates a new ruler from a distributor and chunk store.
@@ -112,7 +171,7 @@ func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) (*Ruler, e
112
171
alertURL : cfg .ExternalURL .URL ,
113
172
notifierCfg : ncfg ,
114
173
queueCapacity : cfg .NotificationQueueCapacity ,
115
- notifiers : map [string ]* notifier. Notifier {},
174
+ notifiers : map [string ]* rulerNotifier {},
116
175
}, nil
117
176
}
118
177
@@ -124,23 +183,23 @@ func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
124
183
}
125
184
126
185
u := rulerConfig .AlertmanagerURL
127
- var sdConfig config .ServiceDiscoveryConfig
186
+ var sdConfig sd_config .ServiceDiscoveryConfig
128
187
if rulerConfig .AlertmanagerDiscovery {
129
188
if ! strings .Contains (u .Host , "_tcp." ) {
130
189
return nil , fmt .Errorf ("When alertmanager-discovery is on, host name must be of the form _portname._tcp.service.fqdn (is %q)" , u .Host )
131
190
}
132
- dnsSDConfig := config. DNSSDConfig {
191
+ dnsSDConfig := dns. SDConfig {
133
192
Names : []string {u .Host },
134
193
RefreshInterval : model .Duration (rulerConfig .AlertmanagerRefreshInterval ),
135
194
Type : "SRV" ,
136
195
Port : 0 , // Ignored, because of SRV.
137
196
}
138
- sdConfig = config .ServiceDiscoveryConfig {
139
- DNSSDConfigs : []* config. DNSSDConfig {& dnsSDConfig },
197
+ sdConfig = sd_config .ServiceDiscoveryConfig {
198
+ DNSSDConfigs : []* dns. SDConfig {& dnsSDConfig },
140
199
}
141
200
} else {
142
- sdConfig = config .ServiceDiscoveryConfig {
143
- StaticConfigs : []* config. TargetGroup {
201
+ sdConfig = sd_config .ServiceDiscoveryConfig {
202
+ StaticConfigs : []* targetgroup. Group {
144
203
{
145
204
Targets : []model.LabelSet {
146
205
{
@@ -165,14 +224,14 @@ func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
165
224
}
166
225
167
226
if u .User != nil {
168
- amConfig .HTTPClientConfig = config .HTTPClientConfig {
169
- BasicAuth : & config .BasicAuth {
227
+ amConfig .HTTPClientConfig = config_util .HTTPClientConfig {
228
+ BasicAuth : & config_util .BasicAuth {
170
229
Username : u .User .Username (),
171
230
},
172
231
}
173
232
174
233
if password , isSet := u .User .Password (); isSet {
175
- amConfig .HTTPClientConfig .BasicAuth .Password = config .Secret (password )
234
+ amConfig .HTTPClientConfig .BasicAuth .Password = config_util .Secret (password )
176
235
}
177
236
}
178
237
@@ -191,26 +250,59 @@ func (r *Ruler) newGroup(ctx context.Context, rs []rules.Rule) (*rules.Group, er
191
250
}
192
251
opts := & rules.ManagerOptions {
193
252
Appendable : appendable ,
194
- QueryEngine : r .engine ,
253
+ QueryFunc : rules . EngineQueryFunc ( r .engine ) ,
195
254
Context : ctx ,
196
255
ExternalURL : r .alertURL ,
197
- Notifier : notifier ,
256
+ NotifyFunc : sendAlerts ( notifier , r . alertURL . String ()) ,
198
257
Logger : gklog .NewNopLogger (),
258
+ Registerer : prometheus .DefaultRegisterer ,
199
259
}
200
260
delay := 0 * time .Second // Unused, so 0 value is fine.
201
261
return rules .NewGroup ("default" , "none" , delay , rs , opts ), nil
202
262
}
203
263
264
+ // sendAlerts implements a the rules.NotifyFunc for a Notifier.
265
+ // It filters any non-firing alerts from the input.
266
+ //
267
+ // Copied from Prometheus's main.go.
268
+ func sendAlerts (n * notifier.Notifier , externalURL string ) rules.NotifyFunc {
269
+ return func (ctx native_ctx.Context , expr string , alerts ... * rules.Alert ) error {
270
+ var res []* notifier.Alert
271
+
272
+ for _ , alert := range alerts {
273
+ // Only send actually firing alerts.
274
+ if alert .State == rules .StatePending {
275
+ continue
276
+ }
277
+ a := & notifier.Alert {
278
+ StartsAt : alert .FiredAt ,
279
+ Labels : alert .Labels ,
280
+ Annotations : alert .Annotations ,
281
+ GeneratorURL : externalURL + strutil .TableLinkForExpression (expr ),
282
+ }
283
+ if ! alert .ResolvedAt .IsZero () {
284
+ a .EndsAt = alert .ResolvedAt
285
+ }
286
+ res = append (res , a )
287
+ }
288
+
289
+ if len (alerts ) > 0 {
290
+ n .Send (res ... )
291
+ }
292
+ return nil
293
+ }
294
+ }
295
+
204
296
func (r * Ruler ) getOrCreateNotifier (userID string ) (* notifier.Notifier , error ) {
205
297
r .notifiersMtx .Lock ()
206
298
defer r .notifiersMtx .Unlock ()
207
299
208
300
n , ok := r .notifiers [userID ]
209
301
if ok {
210
- return n , nil
302
+ return n . notifier , nil
211
303
}
212
304
213
- n = notifier . New (& notifier.Options {
305
+ n = newRulerNotifier (& notifier.Options {
214
306
QueueCapacity : r .queueCapacity ,
215
307
Do : func (ctx context.Context , client * http.Client , req * http.Request ) (* http.Response , error ) {
216
308
// Note: The passed-in context comes from the Prometheus rule group code
@@ -222,17 +314,18 @@ func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Notifier, error) {
222
314
}
223
315
return ctxhttp .Do (ctx , client , req )
224
316
},
225
- }, gklog .NewNopLogger ())
317
+ }, util .Logger )
318
+
319
+ go n .run ()
226
320
227
321
// This should never fail, unless there's a programming mistake.
228
- if err := n .ApplyConfig (r .notifierCfg ); err != nil {
322
+ if err := n .applyConfig (r .notifierCfg ); err != nil {
229
323
return nil , err
230
324
}
231
- go n .Run ()
232
325
233
326
// TODO: Remove notifiers for stale users. Right now this is a slow leak.
234
327
r .notifiers [userID ] = n
235
- return n , nil
328
+ return n . notifier , nil
236
329
}
237
330
238
331
// Evaluate a list of rules in the given context.
@@ -245,7 +338,7 @@ func (r *Ruler) Evaluate(ctx context.Context, rs []rules.Rule) {
245
338
level .Error (logger ).Log ("msg" , "failed to create rule group" , "err" , err )
246
339
return
247
340
}
248
- g .Eval (start )
341
+ g .Eval (ctx , start )
249
342
250
343
// The prometheus routines we're calling have their own instrumentation
251
344
// but, a) it's rule-based, not group-based, b) it's a summary, not a
@@ -260,7 +353,7 @@ func (r *Ruler) Stop() {
260
353
defer r .notifiersMtx .Unlock ()
261
354
262
355
for _ , n := range r .notifiers {
263
- n .Stop ()
356
+ n .stop ()
264
357
}
265
358
}
266
359
0 commit comments