54
54
Help : "Time (in seconds) spent serving HTTP requests." ,
55
55
Buckets : prometheus .DefBuckets ,
56
56
}, []string {"method" , "route" , "status_code" , "ws" })
57
- instr = middleware .Merge (
58
- middleware .Logging ,
59
- middleware.Instrument {
60
- Duration : requestDuration ,
61
- },
62
- ).Wrap
63
57
)
64
58
65
59
func init () {
@@ -82,6 +76,7 @@ type cfg struct {
82
76
flushPeriod time.Duration
83
77
maxChunkAge time.Duration
84
78
numTokens int
79
+ logSuccess bool
85
80
86
81
distributorConfig prism.DistributorConfig
87
82
}
@@ -107,6 +102,7 @@ func main() {
107
102
flag .IntVar (& cfg .distributorConfig .MinReadSuccesses , "distributor.min-read-successes" , 2 , "The minimum number of ingesters from which a read must succeed." )
108
103
flag .IntVar (& cfg .distributorConfig .MinWriteSuccesses , "distributor.min-write-successes" , 2 , "The minimum number of ingesters to which a write must succeed." )
109
104
flag .DurationVar (& cfg .distributorConfig .HeartbeatTimeout , "distributor.heartbeat-timeout" , time .Minute , "The heartbeat timeout after which ingesters are skipped for reads/writes." )
105
+ flag .BoolVar (& cfg .logSuccess , "log.success" , false , "Log successful requests" )
110
106
flag .Parse ()
111
107
112
108
chunkStore , err := setupChunkStore (cfg )
@@ -128,7 +124,7 @@ func main() {
128
124
return prism .NewIngesterClient (address , cfg .remoteTimeout )
129
125
}
130
126
defer ring .Stop ()
131
- setupDistributor (cfg .distributorConfig , chunkStore )
127
+ setupDistributor (cfg .distributorConfig , chunkStore , cfg . logSuccess )
132
128
if err != nil {
133
129
log .Fatalf ("Error initializing distributor: %v" , err )
134
130
}
@@ -141,11 +137,11 @@ func main() {
141
137
log .Fatalf ("Could not register ingester: %v" , err )
142
138
}
143
139
defer registration .Unregister ()
144
- cfg := ingester.Config {
140
+ ingesterCfg := ingester.Config {
145
141
FlushCheckPeriod : cfg .flushPeriod ,
146
142
MaxChunkAge : cfg .maxChunkAge ,
147
143
}
148
- ing := setupIngester (chunkStore , cfg )
144
+ ing := setupIngester (chunkStore , ingesterCfg , cfg . logSuccess )
149
145
defer ing .Stop ()
150
146
default :
151
147
log .Fatalf ("Mode %s not supported!" , cfg .mode )
@@ -192,15 +188,16 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
192
188
func setupDistributor (
193
189
cfg prism.DistributorConfig ,
194
190
chunkStore chunk.Store ,
191
+ logSuccess bool ,
195
192
) {
196
193
distributor := prism .NewDistributor (cfg )
197
194
prometheus .MustRegister (distributor )
198
195
199
196
prefix := "/api/prom"
200
- http .Handle (prefix + "/push" , instr ( prism .AppenderHandler (distributor )))
197
+ http .Handle (prefix + "/push" , instrument ( logSuccess , prism .AppenderHandler (distributor )))
201
198
202
199
// TODO: Move querier to separate binary.
203
- setupQuerier (distributor , chunkStore , prefix )
200
+ setupQuerier (distributor , chunkStore , prefix , logSuccess )
204
201
}
205
202
206
203
// setupQuerier sets up a complete querying pipeline:
@@ -212,6 +209,7 @@ func setupQuerier(
212
209
distributor * prism.Distributor ,
213
210
chunkStore chunk.Store ,
214
211
prefix string ,
212
+ logSuccess bool ,
215
213
) {
216
214
querier := prism.MergeQuerier {
217
215
Queriers : []prism.Querier {
@@ -240,22 +238,35 @@ func setupQuerier(
240
238
api .Register (router .WithPrefix (prefix + "/api/v1" ))
241
239
http .Handle ("/" , router )
242
240
243
- http .Handle (prefix + "/graph" , instr ( ui .GraphHandler ()))
244
- http .Handle (prefix + "/static/" , instr ( ui .StaticAssetsHandler (prefix + "/static/" )))
241
+ http .Handle (prefix + "/graph" , instrument ( logSuccess , ui .GraphHandler ()))
242
+ http .Handle (prefix + "/static/" , instrument ( logSuccess , ui .StaticAssetsHandler (prefix + "/static/" )))
245
243
}
246
244
247
245
func setupIngester (
248
246
chunkStore chunk.Store ,
249
247
cfg ingester.Config ,
248
+ logSuccess bool ,
250
249
) * ingester.Ingester {
251
250
ingester , err := ingester .New (cfg , chunkStore )
252
251
if err != nil {
253
252
log .Fatal (err )
254
253
}
255
254
prometheus .MustRegister (ingester )
256
255
257
- http .Handle ("/push" , instr ( prism .AppenderHandler (ingester )))
258
- http .Handle ("/query" , instr ( prism .QueryHandler (ingester )))
259
- http .Handle ("/label_values" , instr ( prism .LabelValuesHandler (ingester )))
256
+ http .Handle ("/push" , instrument ( logSuccess , prism .AppenderHandler (ingester )))
257
+ http .Handle ("/query" , instrument ( logSuccess , prism .QueryHandler (ingester )))
258
+ http .Handle ("/label_values" , instrument ( logSuccess , prism .LabelValuesHandler (ingester )))
260
259
return ingester
261
260
}
261
+
262
+ // instrument instruments a handler.
263
+ func instrument (logSuccess bool , handler http.Handler ) http.Handler {
264
+ return middleware .Merge (
265
+ middleware.Log {
266
+ LogSuccess : logSuccess ,
267
+ },
268
+ middleware.Instrument {
269
+ Duration : requestDuration ,
270
+ },
271
+ ).Wrap (handler )
272
+ }
0 commit comments