@@ -10,6 +10,7 @@ import (
10
10
"syscall"
11
11
"time"
12
12
13
+ "github.com/gorilla/mux"
13
14
"github.com/prometheus/client_golang/prometheus"
14
15
"github.com/prometheus/common/log"
15
16
"github.com/prometheus/common/route"
@@ -128,13 +129,15 @@ func main() {
128
129
r := ring .New (consul , cfg .distributorConfig .HeartbeatTimeout )
129
130
defer r .Stop ()
130
131
132
+ router := mux .NewRouter ()
131
133
switch cfg .mode {
132
134
case modeDistributor :
133
135
cfg .distributorConfig .Ring = r
134
136
cfg .distributorConfig .ClientFactory = func (address string ) (* distributor.IngesterClient , error ) {
135
137
return distributor .NewIngesterClient (address , cfg .remoteTimeout )
136
138
}
137
- setupDistributor (cfg .distributorConfig , chunkStore , cfg .logSuccess )
139
+ setupDistributor (cfg .distributorConfig , chunkStore , router .Path ("/api/prom" ))
140
+
138
141
case modeIngester :
139
142
cfg .ingesterConfig .Ring = r
140
143
registration , err := ring .RegisterIngester (consul , cfg .listenPort , cfg .numTokens )
@@ -143,7 +146,7 @@ func main() {
143
146
// network errors.
144
147
log .Fatalf ("Could not register ingester: %v" , err )
145
148
}
146
- ing := setupIngester (chunkStore , cfg .ingesterConfig , cfg . logSuccess )
149
+ ing := setupIngester (chunkStore , cfg .ingesterConfig , router . NewRoute () )
147
150
148
151
// Deferring a func to make ordering obvious
149
152
defer func () {
@@ -157,8 +160,16 @@ func main() {
157
160
log .Fatalf ("Mode %s not supported!" , cfg .mode )
158
161
}
159
162
160
- http .Handle ("/metrics" , prometheus .Handler ())
161
- go http .ListenAndServe (fmt .Sprintf (":%d" , cfg .listenPort ), nil )
163
+ router .Handle ("/metrics" , prometheus .Handler ())
164
+ instrumented := middleware .Merge (
165
+ middleware.Log {
166
+ LogSuccess : cfg .logSuccess ,
167
+ },
168
+ middleware.Instrument {
169
+ Duration : requestDuration ,
170
+ },
171
+ ).Wrap (router )
172
+ go http .ListenAndServe (fmt .Sprintf (":%d" , cfg .listenPort ), instrumented )
162
173
163
174
term := make (chan os.Signal )
164
175
signal .Notify (term , os .Interrupt , syscall .SIGTERM )
@@ -198,19 +209,18 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
198
209
func setupDistributor (
199
210
cfg distributor.Config ,
200
211
chunkStore chunk.Store ,
201
- logSuccess bool ,
212
+ router * mux. Route ,
202
213
) {
203
214
dist , err := distributor .New (cfg )
204
215
if err != nil {
205
216
log .Fatal (err )
206
217
}
207
218
prometheus .MustRegister (dist )
208
219
209
- prefix := "/api/prom"
210
- http .Handle (prefix + "/push" , instrument (logSuccess , cortex .AppenderHandler (dist , handleDistributorError )))
220
+ router .Path ("/push" ).Handler (cortex .AppenderHandler (dist , handleDistributorError ))
211
221
212
222
// TODO: Move querier to separate binary.
213
- setupQuerier (dist , chunkStore , prefix , logSuccess )
223
+ setupQuerier (dist , chunkStore , router )
214
224
}
215
225
216
226
func handleDistributorError (w http.ResponseWriter , err error ) {
@@ -235,8 +245,7 @@ func handleDistributorError(w http.ResponseWriter, err error) {
235
245
func setupQuerier (
236
246
distributor * distributor.Distributor ,
237
247
chunkStore chunk.Store ,
238
- prefix string ,
239
- logSuccess bool ,
248
+ router * mux.Route ,
240
249
) {
241
250
queryable := querier.Queryable {
242
251
Q : querier.MergeQuerier {
@@ -248,48 +257,35 @@ func setupQuerier(
248
257
},
249
258
},
250
259
}
251
-
252
260
engine := promql .NewEngine (queryable , nil )
253
-
254
261
api := v1 .NewAPI (engine , querier.DummyStorage {Queryable : queryable })
255
- router := route .New (func (r * http.Request ) (context.Context , error ) {
262
+ promRouter := route .New (func (r * http.Request ) (context.Context , error ) {
256
263
userID := r .Header .Get (userIDHeaderName )
257
- if r .Method != "OPTIONS" && userID == "" {
258
- // For now, getting the user ID from basic auth allows for easy testing
259
- // with Grafana.
260
- // TODO: Remove basic auth support.
261
- userID , _ , _ = r .BasicAuth ()
262
- if userID == "" {
263
- return nil , fmt .Errorf ("missing user ID" )
264
- }
265
- }
266
264
return user .WithID (context .Background (), userID ), nil
267
265
})
268
- api .Register (router .WithPrefix (prefix + "/api/v1" ))
269
- http .Handle ("/" , router )
270
-
271
- http .Handle (prefix + "/user_stats" , instrument (logSuccess , cortex .DistributorUserStatsHandler (distributor .UserStats )))
272
-
273
- http .Handle (prefix + "/graph" , instrument (logSuccess , ui .GraphHandler ()))
274
- http .Handle (prefix + "/static/" , instrument (logSuccess , ui .StaticAssetsHandler (prefix + "/static/" )))
266
+ api .Register (promRouter )
267
+ router .Path ("/api/v1" ).Handler (promRouter )
268
+ router .Path ("/user_stats" ).Handler (cortex .DistributorUserStatsHandler (distributor .UserStats ))
269
+ router .Path ("/graph" ).Handler (ui .GraphHandler ())
270
+ router .Path ("/static/" ).Handler (ui .StaticAssetsHandler ("/api/prom/static/" ))
275
271
}
276
272
277
273
func setupIngester (
278
274
chunkStore chunk.Store ,
279
275
cfg ingester.Config ,
280
- logSuccess bool ,
276
+ router * mux. Route ,
281
277
) * ingester.Ingester {
282
278
ingester , err := ingester .New (cfg , chunkStore )
283
279
if err != nil {
284
280
log .Fatal (err )
285
281
}
286
282
prometheus .MustRegister (ingester )
287
283
288
- http . Handle ("/push" , instrument ( logSuccess , cortex .AppenderHandler (ingester , handleIngesterError ) ))
289
- http . Handle ("/query" , instrument ( logSuccess , cortex .QueryHandler (ingester ) ))
290
- http . Handle ("/label_values" , instrument ( logSuccess , cortex .LabelValuesHandler (ingester ) ))
291
- http . Handle ("/user_stats" , instrument ( logSuccess , cortex .IngesterUserStatsHandler (ingester .UserStats ) ))
292
- http . Handle ("/ready" , instrument ( logSuccess , cortex .IngesterReadinessHandler (ingester ) ))
284
+ router . Path ("/push" ). Handler ( cortex .AppenderHandler (ingester , handleIngesterError ))
285
+ router . Path ("/query" ). Handler ( cortex .QueryHandler (ingester ))
286
+ router . Path ("/label_values" ). Handler ( cortex .LabelValuesHandler (ingester ))
287
+ router . Path ("/user_stats" ). Handler ( cortex .IngesterUserStatsHandler (ingester .UserStats ))
288
+ router . Path ("/ready" ). Handler ( cortex .IngesterReadinessHandler (ingester ))
293
289
return ingester
294
290
}
295
291
@@ -303,15 +299,3 @@ func handleIngesterError(w http.ResponseWriter, err error) {
303
299
http .Error (w , err .Error (), http .StatusInternalServerError )
304
300
}
305
301
}
306
-
307
- // instrument instruments a handler.
308
- func instrument (logSuccess bool , handler http.Handler ) http.Handler {
309
- return middleware .Merge (
310
- middleware.Log {
311
- LogSuccess : logSuccess ,
312
- },
313
- middleware.Instrument {
314
- Duration : requestDuration ,
315
- },
316
- ).Wrap (handler )
317
- }
0 commit comments