Skip to content

Commit 3fc8e31

Browse files
committed
Fix possible data corruption, goroutine deadlock and memory leak
Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent ce6a244 commit 3fc8e31

File tree

2 files changed

+50
-31
lines changed

2 files changed

+50
-31
lines changed

pkg/ingester/ingester.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ func (i *Ingester) StopIncomingRequests() {
334334

335335
// Push implements client.IngesterServer
336336
func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
337+
337338
if i.cfg.TSDBEnabled {
338339
return i.v2Push(ctx, req)
339340
}
@@ -374,7 +375,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
374375
return nil, err
375376
}
376377
}
377-
client.ReuseSlice(req.Timeseries)
378+
defer client.ReuseSlice(req.Timeseries)
378379

379380
if lastPartialErr != nil {
380381
return &client.WriteResponse{}, lastPartialErr.WrappedError()

pkg/ingester/wal.go

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ type WALConfig struct {
3939

4040
// RegisterFlags adds the flags required to config this to the given FlagSet
4141
func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) {
42-
f.BoolVar(&cfg.walEnabled, "ingester.wal-enable", false, "Enable the WAL.")
43-
f.BoolVar(&cfg.checkpointEnabled, "ingester.checkpoint-enable", false, "Enable checkpointing.")
42+
f.BoolVar(&cfg.walEnabled, "ingester.wal-enabled", false, "Enable the WAL.")
43+
f.BoolVar(&cfg.checkpointEnabled, "ingester.checkpoint-enabled", false, "Enable checkpointing.")
4444
f.BoolVar(&cfg.recover, "ingester.recover-from-wal", false, "Recover data from existing WAL.")
4545
f.StringVar(&cfg.dir, "ingester.wal-dir", "wal", "Directory to store the WAL.")
4646
f.DurationVar(&cfg.checkpointDuration, "ingester.checkpoint-duration", 1*time.Hour, "Duration over which to checkpoint.")
@@ -437,10 +437,17 @@ func segmentsExist(dir string) (bool, error) {
437437
// processCheckpoint loads the chunks of the series present in the last checkpoint.
438438
func processCheckpoint(name string, userStates *userStates, nWorkers int,
439439
stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) error {
440+
441+
reader, closer, err := newWalReader(name, -1)
442+
if err != nil {
443+
return err
444+
}
445+
defer closer.Close()
446+
440447
var (
441448
inputs = make([]chan *Series, nWorkers)
442449
// errChan is to capture the errors from goroutine.
443-
// The channel size is nWorkers to not block any worker if all of them error out.
450+
// The channel size is nWorkers+1 to not block any worker if all of them error out.
444451
errChan = make(chan error, nWorkers)
445452
wg = sync.WaitGroup{}
446453
seriesPool = &sync.Pool{
@@ -450,12 +457,6 @@ func processCheckpoint(name string, userStates *userStates, nWorkers int,
450457
}
451458
)
452459

453-
reader, closer, err := newWalReader(name, -1)
454-
if err != nil {
455-
return err
456-
}
457-
defer closer.Close()
458-
459460
wg.Add(nWorkers)
460461
for i := 0; i < nWorkers; i++ {
461462
inputs[i] = make(chan *Series, 300)
@@ -465,12 +466,15 @@ func processCheckpoint(name string, userStates *userStates, nWorkers int,
465466
}(inputs[i], stateCache[i], seriesCache[i])
466467
}
467468

468-
var errFromChan error
469+
var capturedErr error
469470
Loop:
470471
for reader.Next() {
471472
s := seriesPool.Get().(*Series)
472473
if err := proto.Unmarshal(reader.Record(), s); err != nil {
473-
return err
474+
// We don't return here in order to close/drain all the channels and
475+
// make sure all goroutines exit.
476+
capturedErr = err
477+
break Loop
474478
}
475479
// The yoloString from the unmarshal of LabelAdapter gets corrupted
476480
// when travelling through the channel. Hence making a copy of that.
@@ -479,7 +483,7 @@ Loop:
479483
s.Labels = copyLabelAdapters(s.Labels)
480484

481485
select {
482-
case errFromChan = <-errChan:
486+
case capturedErr = <-errChan:
483487
// Exit early on an error.
484488
// Only acts upon the first error received.
485489
break Loop
@@ -488,17 +492,24 @@ Loop:
488492
inputs[mod] <- s
489493
}
490494
}
495+
491496
for i := 0; i < nWorkers; i++ {
492497
close(inputs[i])
493498
}
494499
wg.Wait()
500+
// If any worker errored out, some input channels might not be empty.
501+
// Hence drain them.
502+
for i := 0; i < nWorkers; i++ {
503+
for range inputs[i] {
504+
}
505+
}
495506

496-
if errFromChan != nil {
497-
return errFromChan
507+
if capturedErr != nil {
508+
return capturedErr
498509
}
499510
select {
500-
case errFromChan = <-errChan:
501-
return errFromChan
511+
case capturedErr = <-errChan:
512+
return capturedErr
502513
default:
503514
if err := reader.Err(); err != nil {
504515
return err
@@ -566,6 +577,13 @@ type samplesWithUserID struct {
566577
// processWAL processes the records in the WAL concurrently.
567578
func processWAL(name string, startSegment int, userStates *userStates, nWorkers int,
568579
stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) error {
580+
581+
reader, closer, err := newWalReader(name, startSegment)
582+
if err != nil {
583+
return err
584+
}
585+
defer closer.Close()
586+
569587
var (
570588
wg sync.WaitGroup
571589
inputs = make([]chan *samplesWithUserID, nWorkers)
@@ -589,27 +607,24 @@ func processWAL(name string, startSegment int, userStates *userStates, nWorkers
589607
}(inputs[i], outputs[i], stateCache[i], seriesCache[i])
590608
}
591609

592-
reader, closer, err := newWalReader(name, startSegment)
593-
if err != nil {
594-
return err
595-
}
596-
defer closer.Close()
597-
598610
var (
599-
errFromChan error
611+
capturedErr error
600612
record = &Record{}
601613
)
602614
Loop:
603615
for reader.Next() {
604616
select {
605-
case errFromChan = <-errChan:
617+
case capturedErr = <-errChan:
606618
// Exit early on an error.
607619
// Only acts upon the first error received.
608620
break Loop
609621
default:
610622
}
611623
if err := proto.Unmarshal(reader.Record(), record); err != nil {
612-
return err
624+
// We don't return here in order to close/drain all the channels and
625+
// make sure all goroutines exit.
626+
capturedErr = err
627+
break Loop
613628
}
614629

615630
if len(record.Labels) > 0 {
@@ -622,7 +637,10 @@ Loop:
622637
}
623638
_, err := state.createSeriesWithFingerprint(model.Fingerprint(labels.Fingerprint), labels.Labels, nil, true)
624639
if err != nil {
625-
return err
640+
// We don't return here in order to close/drain all the channels and
641+
// make sure all goroutines exit.
642+
capturedErr = err
643+
break Loop
626644
}
627645
}
628646
}
@@ -680,12 +698,12 @@ Loop:
680698
}
681699
}
682700

683-
if errFromChan != nil {
684-
return errFromChan
701+
if capturedErr != nil {
702+
return capturedErr
685703
}
686704
select {
687-
case errFromChan = <-errChan:
688-
return errFromChan
705+
case capturedErr = <-errChan:
706+
return capturedErr
689707
default:
690708
if err := reader.Err(); err != nil {
691709
return err

0 commit comments

Comments
 (0)