@@ -54,9 +54,9 @@ type Distributor struct {
54
54
type ReadRing interface {
55
55
prometheus.Collector
56
56
57
- Get (key uint32 , n int , op ring.Operation ) ([]* ring.IngesterDesc , error )
58
- BatchGet (keys []uint32 , n int , op ring.Operation ) ([][]* ring.IngesterDesc , error )
59
- GetAll () []* ring.IngesterDesc
57
+ Get (key uint32 , n int , op ring.Operation ) ([]ring.IngesterDesc , error )
58
+ BatchGet (keys []uint32 , n int , op ring.Operation ) ([][]ring.IngesterDesc , error )
59
+ GetAll () []ring.IngesterDesc
60
60
}
61
61
62
62
// Config contains the configuration require to
@@ -121,7 +121,7 @@ func New(cfg Config) (*Distributor, error) {
121
121
}, nil
122
122
}
123
123
124
- func (d * Distributor ) getClientFor (ingester * ring.IngesterDesc ) (cortex.IngesterClient , error ) {
124
+ func (d * Distributor ) getClientFor (ingester ring.IngesterDesc ) (cortex.IngesterClient , error ) {
125
125
d .clientsMtx .RLock ()
126
126
client , ok := d .clients [ingester .Hostname ]
127
127
d .clientsMtx .RUnlock ()
@@ -137,7 +137,7 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.Ingester
137
137
}
138
138
139
139
conn , err := grpc .Dial (
140
- ingester .Hostname ,
140
+ ingester .GRPCHostname ,
141
141
grpc .WithInsecure (),
142
142
grpc .WithUnaryInterceptor (grpc_middleware .ChainUnaryClient (
143
143
otgrpc .OpenTracingClientInterceptor (opentracing .GlobalTracer ()),
@@ -192,7 +192,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
192
192
}
193
193
194
194
sampleTrackers := make ([]sampleTracker , len (samples ), len (samples ))
195
- samplesByIngester := map [* ring.IngesterDesc ][]* sampleTracker {}
195
+ samplesByIngester := map [ring.IngesterDesc ][]* sampleTracker {}
196
196
for i := range samples {
197
197
sampleTrackers [i ] = sampleTracker {
198
198
sample : samples [i ],
@@ -204,9 +204,9 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
204
204
// Skip those that have not heartbeated in a while. NB these are still
205
205
// included in the calculation of minSuccess, so if too many failed ingesters
206
206
// will cause the whole write to fail.
207
- liveIngesters := make ([]* ring.IngesterDesc , 0 , len (ingesters [i ]))
207
+ liveIngesters := make ([]ring.IngesterDesc , 0 , len (ingesters [i ]))
208
208
for _ , ingester := range ingesters [i ] {
209
- if time .Now ().Sub (time . Unix ( ingester .Timestamp , 0 ) ) <= d .cfg .HeartbeatTimeout {
209
+ if time .Now ().Sub (ingester .Timestamp ) <= d .cfg .HeartbeatTimeout {
210
210
liveIngesters = append (liveIngesters , ingester )
211
211
}
212
212
}
@@ -226,7 +226,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
226
226
227
227
errs := make (chan error )
228
228
for hostname , samples := range samplesByIngester {
229
- go func (ingester * ring.IngesterDesc , samples []* sampleTracker ) {
229
+ go func (ingester ring.IngesterDesc , samples []* sampleTracker ) {
230
230
errs <- d .sendSamples (ctx , ingester , samples )
231
231
}(hostname , samples )
232
232
}
@@ -246,7 +246,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
246
246
return & cortex.WriteResponse {}, nil
247
247
}
248
248
249
- func (d * Distributor ) sendSamples (ctx context.Context , ingester * ring.IngesterDesc , sampleTrackers []* sampleTracker ) error {
249
+ func (d * Distributor ) sendSamples (ctx context.Context , ingester ring.IngesterDesc , sampleTrackers []* sampleTracker ) error {
250
250
client , err := d .getClientFor (ingester )
251
251
if err != nil {
252
252
return err
0 commit comments