Skip to content

Commit 19db5f6

Browse files
thorfourgouthamve
authored andcommitted
use tempdir when xfering tsdb (#1814)
* use tempdir when xfering tsdb Signed-off-by: Thor <[email protected]> * review fixes Signed-off-by: Thor <[email protected]> * TransferTSDB: added back in files xfer in addition to bytes transfer Signed-off-by: Thor <[email protected]> * unit tests: assert the directory was created where expected Signed-off-by: Thor <[email protected]> * remove empty dirs, but don't remove non-empty dirs Signed-off-by: Thor <[email protected]>
1 parent 4eb9373 commit 19db5f6

File tree

3 files changed

+74
-7
lines changed

3 files changed

+74
-7
lines changed

pkg/ingester/ingester_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ package ingester
33
import (
44
"context"
55
"fmt"
6+
"io/ioutil"
67
"math"
8+
"math/rand"
79
"net/http"
10+
"path/filepath"
811
"sort"
912
"strconv"
1013
"sync"
@@ -572,3 +575,21 @@ func BenchmarkIngesterPush(b *testing.B) {
572575
}
573576

574577
}
578+
579+
func TestRemoveEmptyDir(t *testing.T) {
580+
581+
// remove dir that dne
582+
require.NoError(t, removeEmptyDir(fmt.Sprintf("%v", rand.Int63())))
583+
584+
// remove empty dir
585+
dir, err := ioutil.TempDir("", "TestRemoveEmptyDir")
586+
require.NoError(t, err)
587+
require.NoError(t, removeEmptyDir(dir))
588+
589+
// remove non-empty dir
590+
dir, err = ioutil.TempDir("", "TestRemoveEmptyDir")
591+
require.NoError(t, err)
592+
593+
ioutil.WriteFile(filepath.Join(dir, "tempfile"), []byte("hello world"), 0777)
594+
require.NotNil(t, removeEmptyDir(dir))
595+
}

pkg/ingester/lifecycle_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"io"
55
"io/ioutil"
66
"math"
7+
"os"
78
"testing"
89
"time"
910

@@ -423,6 +424,7 @@ func TestV2IngesterTransfer(t *testing.T) {
423424
require.NoError(t, err)
424425
dir2, err := ioutil.TempDir("", "tsdb")
425426
require.NoError(t, err)
427+
require.NoError(t, os.Remove(dir2)) // remove the destination dir so there isn't a move conflict
426428

427429
// Start the first ingester, and get it into ACTIVE state.
428430
cfg1 := defaultIngesterTestConfig()
@@ -521,4 +523,10 @@ func TestV2IngesterTransfer(t *testing.T) {
521523
response, err = ing2.Query(ctx, request)
522524
require.NoError(t, err)
523525
assert.Equal(t, expectedResponse, response)
526+
527+
// Assert the data is in the expected location of dir2
528+
files, err := ioutil.ReadDir(dir2)
529+
require.NoError(t, err)
530+
require.Equal(t, 1, len(files))
531+
require.Equal(t, "1", files[0].Name())
524532
}

pkg/ingester/transfer.go

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ var (
4242
Name: "cortex_ingester_received_files",
4343
Help: "The total number of files received by this ingester whilst joining",
4444
})
45+
receivedBytes = prometheus.NewCounter(prometheus.CounterOpts{
46+
Name: "cortex_ingester_received_bytes_total",
47+
Help: "The total number of bytes received by this ingester whilst joining",
48+
})
49+
sentBytes = prometheus.NewCounter(prometheus.CounterOpts{
50+
Name: "cortex_ingester_sent_bytes_total",
51+
Help: "The total number of bytes sent by this ingester whilst leaving",
52+
})
4553

4654
once *sync.Once
4755
)
@@ -51,7 +59,9 @@ func init() {
5159
prometheus.MustRegister(sentChunks)
5260
prometheus.MustRegister(receivedChunks)
5361
prometheus.MustRegister(sentFiles)
62+
prometheus.MustRegister(receivedBytes)
5463
prometheus.MustRegister(receivedFiles)
64+
prometheus.MustRegister(sentBytes)
5565
}
5666

5767
// TransferChunks receives all the chunks from another ingester.
@@ -208,6 +218,20 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error
208218
fromIngesterID := ""
209219

210220
xfer := func() error {
221+
222+
// Validate the final directory is empty, if it exists and is empty delete it so a move can succeed
223+
err := removeEmptyDir(i.cfg.TSDBConfig.Dir)
224+
if err != nil {
225+
return errors.Wrap(err, "remove existing TSDB directory")
226+
}
227+
228+
tmpDir, err := ioutil.TempDir("", "tsdb_xfer")
229+
if err != nil {
230+
return err
231+
}
232+
defer os.RemoveAll(tmpDir)
233+
234+
bytesXfer := 0
211235
filesXfer := 0
212236

213237
files := make(map[string]*os.File)
@@ -236,15 +260,14 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error
236260
return errors.Wrap(err, "TransferTSDB: checkFromIngesterIsInLeavingState")
237261
}
238262
}
239-
filesXfer++
263+
bytesXfer += len(f.Data)
240264

241-
// TODO(thor) To avoid corruption from errors, it's probably best to write to a temp dir, and then move that to the final location
242265
createfile := func(f *client.TimeSeriesFile) (*os.File, error) {
243-
dir := filepath.Join(i.cfg.TSDBConfig.Dir, filepath.Dir(f.Filename))
266+
dir := filepath.Join(tmpDir, filepath.Dir(f.Filename))
244267
if err := os.MkdirAll(dir, 0777); err != nil {
245268
return nil, errors.Wrap(err, "TransferTSDB: MkdirAll")
246269
}
247-
file, err := os.Create(filepath.Join(i.cfg.TSDBConfig.Dir, f.Filename))
270+
file, err := os.Create(filepath.Join(tmpDir, f.Filename))
248271
if err != nil {
249272
return nil, errors.Wrap(err, "TransferTSDB: Create")
250273
}
@@ -260,7 +283,7 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error
260283
if err != nil {
261284
return err
262285
}
263-
286+
filesXfer++
264287
files[f.Filename] = file
265288
} else {
266289

@@ -275,10 +298,12 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error
275298
return errors.Wrap(err, "TransferTSDB: ClaimTokensFor")
276299
}
277300

301+
receivedBytes.Add(float64(bytesXfer))
278302
receivedFiles.Add(float64(filesXfer))
279-
level.Error(util.Logger).Log("msg", "Total files xfer", "from_ingester", fromIngesterID, "num", filesXfer)
303+
level.Info(util.Logger).Log("msg", "Total xfer", "from_ingester", fromIngesterID, "files", filesXfer, "bytes", bytesXfer)
280304

281-
return nil
305+
// Move the tmpdir to the final location
306+
return os.Rename(tmpDir, i.cfg.TSDBConfig.Dir)
282307
}
283308

284309
if err := i.transfer(stream.Context(), xfer); err != nil {
@@ -644,6 +669,7 @@ func batchSend(batch int, b []byte, stream client.Ingester_TransferTSDBClient, t
644669
if err != nil {
645670
return err
646671
}
672+
sentBytes.Add(float64(len(tsfile.Data)))
647673
}
648674

649675
// Send final data
@@ -653,7 +679,19 @@ func batchSend(batch int, b []byte, stream client.Ingester_TransferTSDBClient, t
653679
if err != nil {
654680
return err
655681
}
682+
sentBytes.Add(float64(len(tsfile.Data)))
656683
}
657684

658685
return nil
659686
}
687+
688+
func removeEmptyDir(dir string) error {
689+
if _, err := os.Stat(dir); err != nil {
690+
if os.IsNotExist(err) {
691+
return nil
692+
}
693+
return err
694+
}
695+
696+
return os.Remove(dir)
697+
}

0 commit comments

Comments
 (0)