Skip to content

Commit f21f814

Browse files
committed
Initial attempt for flushing chunks from WAL [WIP]
Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent 5bb950f commit f21f814

File tree

1 file changed

+99
-4
lines changed

1 file changed

+99
-4
lines changed

pkg/ingester/wal.go

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ingester
22

33
import (
4+
"context"
45
"flag"
56
"fmt"
67
"io/ioutil"
@@ -17,10 +18,12 @@ import (
1718
"github.com/pkg/errors"
1819
"github.com/prometheus/client_golang/prometheus"
1920
"github.com/prometheus/common/model"
21+
"github.com/prometheus/prometheus/pkg/labels"
2022
tsdb_errors "github.com/prometheus/tsdb/errors"
2123
"github.com/prometheus/tsdb/fileutil"
2224
"github.com/prometheus/tsdb/wal"
2325

26+
"github.com/cortexproject/cortex/pkg/ingester/client"
2427
"github.com/cortexproject/cortex/pkg/util"
2528
)
2629

@@ -172,7 +175,7 @@ func (w *walWrapper) checkpoint() (err error) {
172175
w.checkpointCreationFail.Inc()
173176
}
174177
}()
175-
_, last, err := w.lastCheckpoint()
178+
_, last, err := lastCheckpoint(w.wal.Dir())
176179
if err != nil {
177180
return err
178181
}
@@ -238,8 +241,8 @@ func (w *walWrapper) checkpoint() (err error) {
238241

239242
// lastCheckpoint returns the directory name and index of the most recent checkpoint.
240243
// If dir does not contain any checkpoints, -1 is returned as index.
241-
func (w *walWrapper) lastCheckpoint() (string, int, error) {
242-
files, err := ioutil.ReadDir(w.wal.Dir())
244+
func lastCheckpoint(dir string) (string, int, error) {
245+
files, err := ioutil.ReadDir(dir)
243246
if err != nil {
244247
return "", -1, err
245248
}
@@ -257,7 +260,7 @@ func (w *walWrapper) lastCheckpoint() (string, int, error) {
257260
if err != nil {
258261
continue
259262
}
260-
return filepath.Join(w.wal.Dir(), fi.Name()), idx, nil
263+
return filepath.Join(dir, fi.Name()), idx, nil
261264
}
262265
return "", -1, nil
263266
}
@@ -325,3 +328,95 @@ func (w *walWrapper) truncateSamples() error {
325328
w.lastWalSegment = last
326329
return nil
327330
}
331+
332+
func FlushFromWAL(ingester *Ingester, dir string) error {
333+
lastCheckpointDir, _, err := lastCheckpoint(dir)
334+
if err != nil {
335+
return err
336+
}
337+
338+
sr, err := wal.NewSegmentsReader(lastCheckpointDir)
339+
if err != nil {
340+
return err
341+
}
342+
343+
series, err := loadCheckpoint(wal.NewReader(sr))
344+
if err != nil {
345+
return err
346+
}
347+
348+
// Either use ingester series like normal reading and then flush once per series
349+
// or like below - flush once for the checkpoint and 1 more time from the WAL segments
350+
351+
// TODO: check about adding index entries.
352+
353+
for _, s := range series {
354+
chunkDesc, err := fromWireChunks(s.Chunks)
355+
if err != nil {
356+
return err
357+
}
358+
359+
// TODO: user id in the context.
360+
if err := ingester.flushChunks(
361+
context.Background(),
362+
model.Fingerprint(s.Fingerprint),
363+
pbLabelPairToLabels(s.Labels),
364+
chunkDesc,
365+
); err != nil {
366+
return nil
367+
}
368+
}
369+
370+
// TODO: Read segments from WAL which start after the checkpoint.
371+
// Need to do some alignment with segment and checkpoint numbers for that.
372+
sr, err = wal.NewSegmentsReader(dir)
373+
if err != nil {
374+
return nil
375+
}
376+
377+
records, err := loadWAL(wal.NewReader(sr))
378+
if err != nil {
379+
return nil
380+
}
381+
382+
for range records {
383+
384+
}
385+
386+
return nil
387+
}
388+
389+
func loadCheckpoint(r *wal.Reader) (series []*Series, err error) {
390+
for r.Next() {
391+
rec := r.Record()
392+
s := &Series{}
393+
if err := proto.Unmarshal(rec, s); err != nil {
394+
return nil, err
395+
}
396+
series = append(series, s)
397+
398+
}
399+
return series, r.Err()
400+
}
401+
402+
func loadWAL(r *wal.Reader) (records []*Record, err error) {
403+
for r.Next() {
404+
rec := r.Record()
405+
record := &Record{}
406+
if err := proto.Unmarshal(rec, record); err != nil {
407+
return nil, err
408+
}
409+
records = append(records, record)
410+
411+
}
412+
return records, r.Err()
413+
}
414+
415+
func pbLabelPairToLabels(lps []client.LabelPair) labels.Labels {
416+
lbls := make(labels.Labels, len(lps))
417+
for i := range lps {
418+
lbls[i].Name = string(lps[i].Name)
419+
lbls[i].Value = string(lps[i].Value)
420+
}
421+
return lbls
422+
}

0 commit comments

Comments
 (0)