6
6
"flag"
7
7
"fmt"
8
8
"hash/fnv"
9
- "io"
10
9
"sync"
11
10
"sync/atomic"
12
11
"time"
@@ -44,12 +43,12 @@ var (
44
43
// Distributor is a storage.SampleAppender and a client.Querier which
45
44
// forwards appends and queries to individual ingesters.
46
45
type Distributor struct {
47
- cfg Config
48
- ring ring.ReadRing
49
- clientsMtx sync.RWMutex
50
- clients map [ string ]client. IngesterClient
51
- quit chan struct {}
52
- done chan struct {}
46
+ cfg Config
47
+ ring ring.ReadRing
48
+ clientsMtx sync.RWMutex
49
+ ingesterPool * ingester_client. IngesterPool
50
+ quit chan struct {}
51
+ done chan struct {}
53
52
54
53
billingClient * billing.Client
55
54
@@ -73,11 +72,12 @@ type Config struct {
73
72
BillingConfig billing.Config
74
73
IngesterClientConfig ingester_client.Config
75
74
76
- ReplicationFactor int
77
- RemoteTimeout time.Duration
78
- ClientCleanupPeriod time.Duration
79
- IngestionRateLimit float64
80
- IngestionBurstSize int
75
+ ReplicationFactor int
76
+ RemoteTimeout time.Duration
77
+ ClientCleanupPeriod time.Duration
78
+ IngestionRateLimit float64
79
+ IngestionBurstSize int
80
+ HealthCheckIngesters bool
81
81
82
82
// for testing
83
83
ingesterClientFactory func (addr string , cfg ingester_client.Config ) (client.IngesterClient , error )
@@ -93,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
93
93
flag .DurationVar (& cfg .ClientCleanupPeriod , "distributor.client-cleanup-period" , 15 * time .Second , "How frequently to clean up clients for ingesters that have gone away." )
94
94
flag .Float64Var (& cfg .IngestionRateLimit , "distributor.ingestion-rate-limit" , 25000 , "Per-user ingestion rate limit in samples per second." )
95
95
flag .IntVar (& cfg .IngestionBurstSize , "distributor.ingestion-burst-size" , 50000 , "Per-user allowed ingestion burst size (in number of samples)." )
96
+ flag .BoolVar (& cfg .HealthCheckIngesters , "distributor.health-check-ingesters" , false , "Run a health check on each ingester client during periodic cleanup." )
96
97
}
97
98
98
99
// New constructs a new Distributor
@@ -116,7 +117,7 @@ func New(cfg Config, ring ring.ReadRing) (*Distributor, error) {
116
117
d := & Distributor {
117
118
cfg : cfg ,
118
119
ring : ring ,
119
- clients : map [ string ]client. IngesterClient {} ,
120
+ ingesterPool : ingester_client . NewIngesterPool ( cfg . ingesterClientFactory , cfg . IngesterClientConfig , cfg . RemoteTimeout ) ,
120
121
quit : make (chan struct {}),
121
122
done : make (chan struct {}),
122
123
billingClient : billingClient ,
@@ -170,6 +171,9 @@ func (d *Distributor) Run() {
170
171
select {
171
172
case <- cleanupClients .C :
172
173
d .removeStaleIngesterClients ()
174
+ if d .cfg .HealthCheckIngesters {
175
+ d .ingesterPool .CleanUnhealthy ()
176
+ }
173
177
case <- d .quit :
174
178
close (d .done )
175
179
return
@@ -184,52 +188,18 @@ func (d *Distributor) Stop() {
184
188
}
185
189
186
190
func (d * Distributor ) removeStaleIngesterClients () {
187
- d .clientsMtx .Lock ()
188
- defer d .clientsMtx .Unlock ()
189
-
190
191
ingesters := map [string ]struct {}{}
191
192
for _ , ing := range d .ring .GetAll () {
192
193
ingesters [ing .Addr ] = struct {}{}
193
194
}
194
195
195
- for addr , client := range d .clients {
196
+ for _ , addr := range d .ingesterPool . RegisteredAddresses () {
196
197
if _ , ok := ingesters [addr ]; ok {
197
198
continue
198
199
}
199
200
level .Info (util .Logger ).Log ("msg" , "removing stale ingester client" , "addr" , addr )
200
- delete (d .clients , addr )
201
-
202
- // Do the gRPC closing in the background since it might take a while and
203
- // we're holding a mutex.
204
- go func (addr string , closer io.Closer ) {
205
- if err := closer .Close (); err != nil {
206
- level .Error (util .Logger ).Log ("msg" , "error closing connection to ingester" , "ingester" , addr , "err" , err )
207
- }
208
- }(addr , client .(io.Closer ))
209
- }
210
- }
211
-
212
- func (d * Distributor ) getClientFor (ingester * ring.IngesterDesc ) (client.IngesterClient , error ) {
213
- d .clientsMtx .RLock ()
214
- client , ok := d .clients [ingester .Addr ]
215
- d .clientsMtx .RUnlock ()
216
- if ok {
217
- return client , nil
218
- }
219
-
220
- d .clientsMtx .Lock ()
221
- defer d .clientsMtx .Unlock ()
222
- client , ok = d .clients [ingester .Addr ]
223
- if ok {
224
- return client , nil
225
- }
226
-
227
- client , err := d .cfg .ingesterClientFactory (ingester .Addr , d .cfg .IngesterClientConfig )
228
- if err != nil {
229
- return nil , err
201
+ d .ingesterPool .RemoveClientFor (addr )
230
202
}
231
- d .clients [ingester .Addr ] = client
232
- return client , nil
233
203
}
234
204
235
205
func tokenForLabels (userID string , labels []client.LabelPair ) (uint32 , error ) {
@@ -412,7 +382,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe
412
382
}
413
383
414
384
func (d * Distributor ) sendSamplesErr (ctx context.Context , ingester * ring.IngesterDesc , samples []* sampleTracker ) error {
415
- c , err := d .getClientFor (ingester )
385
+ c , err := d .ingesterPool . GetClientFor (ingester . Addr )
416
386
if err != nil {
417
387
return err
418
388
}
@@ -449,7 +419,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
449
419
450
420
metricNameMatcher , _ , ok := util .ExtractMetricNameMatcherFromMatchers (matchers )
451
421
452
- req , err := util .ToQueryRequest (from , to , matchers )
422
+ req , err := ingester_client .ToQueryRequest (from , to , matchers )
453
423
if err != nil {
454
424
return err
455
425
}
@@ -529,7 +499,7 @@ func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.Inge
529
499
}
530
500
531
501
func (d * Distributor ) queryIngester (ctx context.Context , ing * ring.IngesterDesc , req * client.QueryRequest ) (model.Matrix , error ) {
532
- client , err := d .getClientFor (ing )
502
+ client , err := d .ingesterPool . GetClientFor (ing . Addr )
533
503
if err != nil {
534
504
return nil , err
535
505
}
@@ -541,7 +511,7 @@ func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc,
541
511
return nil , err
542
512
}
543
513
544
- return util .FromQueryResponse (resp ), nil
514
+ return ingester_client .FromQueryResponse (resp ), nil
545
515
}
546
516
547
517
// forAllIngesters runs f, in parallel, for all ingesters
@@ -550,7 +520,7 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}
550
520
ingesters := d .ring .GetAll ()
551
521
for _ , ingester := range ingesters {
552
522
go func (ingester * ring.IngesterDesc ) {
553
- client , err := d .getClientFor (ingester )
523
+ client , err := d .ingesterPool . GetClientFor (ingester . Addr )
554
524
if err != nil {
555
525
errs <- err
556
526
return
@@ -609,7 +579,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod
609
579
610
580
// MetricsForLabelMatchers gets the metrics that match said matchers
611
581
func (d * Distributor ) MetricsForLabelMatchers (ctx context.Context , from , through model.Time , matchers ... * labels.Matcher ) ([]metric.Metric , error ) {
612
- req , err := util .ToMetricsForLabelMatchersRequest (from , through , matchers )
582
+ req , err := ingester_client .ToMetricsForLabelMatchersRequest (from , through , matchers )
613
583
if err != nil {
614
584
return nil , err
615
585
}
@@ -623,7 +593,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
623
593
624
594
metrics := map [model.Fingerprint ]model.Metric {}
625
595
for _ , resp := range resps {
626
- ms := util .FromMetricsForLabelMatchersResponse (resp .(* client.MetricsForLabelMatchersResponse ))
596
+ ms := ingester_client .FromMetricsForLabelMatchersResponse (resp .(* client.MetricsForLabelMatchersResponse ))
627
597
for _ , m := range ms {
628
598
metrics [m .Fingerprint ()] = m
629
599
}
@@ -677,7 +647,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
677
647
// Not using d.forAllIngesters(), so we can fail after first error.
678
648
ingesters := d .ring .GetAll ()
679
649
for _ , ingester := range ingesters {
680
- client , err := d .getClientFor (ingester )
650
+ client , err := d .ingesterPool . GetClientFor (ingester . Addr )
681
651
if err != nil {
682
652
return nil , err
683
653
}
@@ -736,6 +706,6 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) {
736
706
ch <- prometheus .MustNewConstMetric (
737
707
numClientsDesc ,
738
708
prometheus .GaugeValue ,
739
- float64 (len ( d . clients )),
709
+ float64 (d . ingesterPool . Count ( )),
740
710
)
741
711
}
0 commit comments