Skip to content

Commit c47661d

Browse files
authored
Merge pull request #237 from weaveworks/grpc-cleanup
Clean up stale ingester connections
2 parents 9764257 + 5a86699 commit c47661d

File tree

4 files changed

+70
-7
lines changed

4 files changed

+70
-7
lines changed

cmd/distributor/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func main() {
4848
if err != nil {
4949
log.Fatalf("Error initializing distributor: %v", err)
5050
}
51+
defer dist.Stop()
5152

5253
server := server.New(serverConfig, r)
5354
server.HTTP.Handle("/api/prom/push", http.HandlerFunc(dist.PushHandler))

cmd/querier/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func main() {
4242
if err != nil {
4343
log.Fatalf("Error initializing distributor: %v", err)
4444
}
45+
defer dist.Stop()
4546

4647
server := server.New(serverConfig, r)
4748
defer server.Stop()

cmd/ruler/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func main() {
3636
if err != nil {
3737
log.Fatalf("Error initializing distributor: %v", err)
3838
}
39+
defer dist.Stop()
3940

4041
rulerServer, err := ruler.NewServer(rulerConfig, ruler.NewRuler(rulerConfig, dist, chunkStore))
4142
if err != nil {

distributor/distributor.go

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"google.golang.org/grpc"
1717

1818
"github.com/prometheus/client_golang/prometheus"
19+
"github.com/prometheus/common/log"
1920
"github.com/prometheus/common/model"
2021
"github.com/prometheus/prometheus/storage/metric"
2122
"github.com/prometheus/prometheus/storage/remote"
@@ -41,7 +42,9 @@ type Distributor struct {
4142
cfg Config
4243
ring ReadRing
4344
clientsMtx sync.RWMutex
44-
clients map[string]cortex.IngesterClient
45+
clients map[string]ingesterClient
46+
quit chan struct{}
47+
done chan struct{}
4548

4649
queryDuration *prometheus.HistogramVec
4750
receivedSamples prometheus.Counter
@@ -52,6 +55,11 @@ type Distributor struct {
5255
ingesterQueryFailures *prometheus.CounterVec
5356
}
5457

58+
type ingesterClient struct {
59+
cortex.IngesterClient
60+
conn *grpc.ClientConn
61+
}
62+
5563
// ReadRing represents the read inferface to the ring.
5664
type ReadRing interface {
5765
prometheus.Collector
@@ -64,10 +72,11 @@ type ReadRing interface {
6472
// Config contains the configuration require to
6573
// create a Distributor
6674
type Config struct {
67-
ReplicationFactor int
68-
MinReadSuccesses int
69-
HeartbeatTimeout time.Duration
70-
RemoteTimeout time.Duration
75+
ReplicationFactor int
76+
MinReadSuccesses int
77+
HeartbeatTimeout time.Duration
78+
RemoteTimeout time.Duration
79+
ClientCleanupPeriod time.Duration
7180
}
7281

7382
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -76,6 +85,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
7685
flag.IntVar(&cfg.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
7786
flag.DurationVar(&cfg.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
7887
flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 5*time.Second, "Timeout for downstream ingesters.")
88+
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
7989
}
8090

8191
// New constructs a new Distributor
@@ -89,7 +99,9 @@ func New(cfg Config, ring ReadRing) (*Distributor, error) {
8999
d := &Distributor{
90100
cfg: cfg,
91101
ring: ring,
92-
clients: map[string]cortex.IngesterClient{},
102+
clients: map[string]ingesterClient{},
103+
quit: make(chan struct{}),
104+
done: make(chan struct{}),
93105
queryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
94106
Namespace: "cortex",
95107
Name: "distributor_query_duration_seconds",
@@ -129,9 +141,54 @@ func New(cfg Config, ring ReadRing) (*Distributor, error) {
129141
}, []string{"ingester"}),
130142
}
131143
prometheus.MustRegister(d)
144+
go d.Run()
132145
return d, nil
133146
}
134147

148+
// Run starts the distributor's maintenance loop.
149+
func (d *Distributor) Run() {
150+
cleanupClients := time.NewTicker(d.cfg.ClientCleanupPeriod)
151+
for {
152+
select {
153+
case <-cleanupClients.C:
154+
d.removeStaleIngesterClients()
155+
case <-d.quit:
156+
close(d.done)
157+
return
158+
}
159+
}
160+
}
161+
162+
// Stop stops the distributor's maintenance loop.
163+
func (d *Distributor) Stop() {
164+
close(d.quit)
165+
<-d.done
166+
}
167+
168+
func (d *Distributor) removeStaleIngesterClients() {
169+
d.clientsMtx.Lock()
170+
defer d.clientsMtx.Unlock()
171+
172+
ingesters := map[string]struct{}{}
173+
for _, ing := range d.ring.GetAll() {
174+
ingesters[ing.Addr] = struct{}{}
175+
}
176+
177+
for addr, client := range d.clients {
178+
if _, ok := ingesters[addr]; !ok {
179+
log.Info("Removing stale ingester client for ", addr)
180+
delete(d.clients, addr)
181+
// Do the gRPC closing in the background since it might take a while and
182+
// we're holding a mutex.
183+
go func() {
184+
if err := client.conn.Close(); err != nil {
185+
log.Errorf("Error closing connection to ingester %q: %v", addr, err)
186+
}
187+
}()
188+
}
189+
}
190+
}
191+
135192
func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.IngesterClient, error) {
136193
d.clientsMtx.RLock()
137194
client, ok := d.clients[ingester.Addr]
@@ -159,7 +216,10 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.Ingester
159216
return nil, err
160217
}
161218

162-
client = cortex.NewIngesterClient(conn)
219+
client = ingesterClient{
220+
IngesterClient: cortex.NewIngesterClient(conn),
221+
conn: conn,
222+
}
163223
d.clients[ingester.Addr] = client
164224
return client, nil
165225
}

0 commit comments

Comments
 (0)