Skip to content

Commit 105841e

Browse files
authored
Don't flush chunks after a successful transfer (#449)
* Don't flush chunks after a successful transfer * Add log message for successful chunk transfer. * Remove old log line.
1 parent 0a92a54 commit 105841e

File tree

3 files changed

+27
-5
lines changed

3 files changed

+27
-5
lines changed

pkg/ingester/ingester_claim.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"io"
55

66
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/common/log"
78
"github.com/prometheus/common/model"
89
"github.com/prometheus/prometheus/storage/local/chunk"
910

@@ -52,7 +53,10 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
5253
// We can't send "extra" fields with a streaming call, so we repeat
5354
// wireSeries.FromIngesterId and assume it is the same every time
5455
// round this loop.
55-
fromIngesterID = wireSeries.FromIngesterId
56+
if fromIngesterID == "" {
57+
fromIngesterID = wireSeries.FromIngesterId
58+
log.Infof("Processing TransferChunks request from ingester '%s'.")
59+
}
5660
metric := util.FromLabelPairs(wireSeries.Labels)
5761
userCtx := user.Inject(stream.Context(), wireSeries.UserId)
5862
descs, err := fromWireChunks(wireSeries.Chunks)

pkg/ingester/ingester_lifecycle.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,13 +325,20 @@ func (i *Ingester) processShutdown() {
325325
flushRequired = false
326326
}
327327
}
328+
328329
if flushRequired {
329330
i.flushAllChunks()
330-
}
331331

332-
// Close the flush queues, to unblock waiting workers.
333-
for _, flushQueue := range i.flushQueues {
334-
flushQueue.Close()
332+
// Close the flush queues, to unblock waiting workers.
333+
for _, flushQueue := range i.flushQueues {
334+
flushQueue.Close()
335+
}
336+
} else {
337+
338+
// Close & empty all the flush queues, to unblock waiting workers.
339+
for _, flushQueue := range i.flushQueues {
340+
flushQueue.DrainAndClose()
341+
}
335342
}
336343

337344
// Wait for chunks to be flushed.
@@ -389,6 +396,7 @@ func (i *Ingester) transferChunks() error {
389396
return err
390397
}
391398

399+
log.Infof("Successfully sent chunks to '%s'", targetIngester.Addr)
392400
return nil
393401
}
394402

pkg/util/priority_queue.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,16 @@ func (pq *PriorityQueue) Close() {
6666
pq.cond.Broadcast()
6767
}
6868

69+
// DrainAndClose closed the queue and removes all the items from it.
70+
func (pq *PriorityQueue) DrainAndClose() {
71+
pq.lock.Lock()
72+
defer pq.lock.Unlock()
73+
pq.closed = true
74+
pq.queue = nil
75+
pq.hit = map[string]struct{}{}
76+
pq.cond.Broadcast()
77+
}
78+
6979
// Enqueue adds an operation to the queue in priority order. If the operation
7080
// is already on the queue, it will be ignored.
7181
func (pq *PriorityQueue) Enqueue(op Op) {

0 commit comments

Comments
 (0)