Skip to content

Use TSDB's WAL for writes. #1103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 79 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
7228c1c
Update vendored libraries:
tomwilkie Oct 30, 2018
831080f
Use TSDB's WAL for writes.
tomwilkie Oct 30, 2018
8710a22
Merge remote-tracking branch 'upstream/master' into wal
codesome Jul 10, 2019
0802aa6
Merge pull request #23 from codesome/wal
codesome Jul 10, 2019
2a6fc2d
Fix merge conflicts
codesome Jul 10, 2019
bc25534
Merge remote-tracking branch 'grafana/wal' into wal
codesome Jul 10, 2019
f023ddb
Fix creation of user states in WAL recover
codesome Jul 11, 2019
5c022da
Merge remote-tracking branch 'upstream/master' into wal
codesome Jul 12, 2019
ac98b13
Remove WAL recovery on start-up and allow nil Record
codesome Aug 6, 2019
b84b02e
Fix types
codesome Aug 6, 2019
587b8ad
Merge remote-tracking branch 'upstream/master' into wal
codesome Aug 12, 2019
05127b0
WAL compression always enabled
codesome Aug 12, 2019
787fbb8
Change checkpoint logic to be more like prometheus's tsdb
codesome Aug 13, 2019
9845a5e
Add metrics for checkpoint and name changes
codesome Aug 13, 2019
5bb950f
Merge remote-tracking branch 'upstream/master' into wal
codesome Aug 13, 2019
f21f814
Initial attempt for flushing chunks from WAL [WIP]
codesome Aug 14, 2019
46eaf36
Combine checkpoint and WAL chunks before flushing
codesome Aug 14, 2019
2b17103
Bring back recovery and tests
codesome Aug 14, 2019
935b73e
Fix race in the test
codesome Aug 14, 2019
a7844c7
Merge remote-tracking branch 'upstream/master' into wal
codesome Aug 16, 2019
538f407
Recover on startup
codesome Aug 19, 2019
ff35948
Dont remove the last segment in truncation
codesome Aug 30, 2019
2913b99
Always read WAL and remove recover-only mode
codesome Sep 3, 2019
e691e01
Timer for WAL recovery time
codesome Sep 6, 2019
dbd336e
More rigorous test for WAL
codesome Sep 11, 2019
0e1577a
More profiling in debug mode
codesome Sep 11, 2019
be2ccbf
Merge remote-tracking branch 'upstream/master' into wal
codesome Sep 11, 2019
eeaae15
Fix race in test
codesome Sep 11, 2019
214b32e
No limits on number of series during recovery
codesome Sep 12, 2019
43dbc9e
No rate limiting of checkpoint
codesome Sep 13, 2019
7a9fec0
Change segment deletion logic
codesome Sep 14, 2019
2d85a98
Process WAL records in parallel.
codesome Sep 17, 2019
50d22c8
Added comments and some refactoring and not returning on no-series fo…
codesome Sep 17, 2019
7282ee8
Process checkpoint series in parallel
codesome Sep 18, 2019
6cd9940
Merge remote-tracking branch 'upstream/master' into wal
codesome Sep 18, 2019
b8a19d2
Merge remote-tracking branch 'upstream/master' into wal
codesome Sep 19, 2019
370dcea
Merge remote-tracking branch 'upstream/master' into wal
codesome Sep 20, 2019
959cf20
Fix race in processing WAL
codesome Sep 24, 2019
8eb90c9
Merge remote-tracking branch 'upstream/master' into wal
codesome Sep 24, 2019
d954865
Small enhancements
codesome Sep 27, 2019
fb75c9c
Cache the user states and series when processing samples
codesome Sep 30, 2019
51beb93
Enhancement in the user state cache and fix in samples buffer
codesome Sep 30, 2019
47a8434
Cache user states and series right from the checkpoint
codesome Oct 1, 2019
248566a
Small enhancements
codesome Oct 2, 2019
f147570
Fix the flag
codesome Oct 3, 2019
bc8a194
Remove test files
codesome Oct 10, 2019
f6ddbdf
Use tsdb from prometheus/prometheus repo
codesome Oct 10, 2019
ad3476e
Avoid flushing on shutdown
codesome Oct 25, 2019
737c415
Merge remote-tracking branch 'upstream/master' into wal
codesome Nov 10, 2019
251bde5
Fix after rebase
codesome Nov 11, 2019
88009b9
Fix bug of resetting userStates
codesome Nov 13, 2019
5dfe853
Fix review comments
codesome Nov 14, 2019
d661843
Merge remote-tracking branch 'upstream/master' into wal
codesome Nov 14, 2019
c55cedc
Small enhancements
codesome Nov 18, 2019
02c3fd6
Merge remote-tracking branch 'upstream/master' into wal
codesome Dec 5, 2019
902917f
Update comments
codesome Dec 5, 2019
4b0d578
Remove ingester<->WAL circular dependancy
codesome Dec 5, 2019
d2fb739
Merge remote-tracking branch 'upstream/master' into wal
codesome Dec 6, 2019
d799951
Change segment size of the WAL
codesome Dec 6, 2019
69e0a48
Use same directory for temporary tokens file
codesome Dec 6, 2019
9758838
Merge branch 'fix-tokens-on-file' into wal
codesome Dec 6, 2019
14218bf
Disble transfer out when WAL is enabled
codesome Dec 11, 2019
3f31526
Flush on shutdown endpoint irrespective of WAL
codesome Dec 11, 2019
db679b1
Merge remote-tracking branch 'upstream/master' into wal
codesome Dec 16, 2019
7e6f5de
Use sync.Pool for the records
codesome Dec 16, 2019
60637c1
Merge remote-tracking branch 'upstream/master' into wal
codesome Dec 17, 2019
f2d0b29
Fix Goutham's comments
codesome Dec 18, 2019
4dd4ce6
Merge remote-tracking branch 'upstream/master' into wal
codesome Dec 19, 2019
ce6a244
Fix Goutham's comments
codesome Dec 19, 2019
3fc8e31
Fix possible data corruption, goroutine deadlock and memory leak
codesome Dec 20, 2019
e618463
Fix review comments
codesome Jan 3, 2020
037d4b3
memoryChunks counter fix, metics updated, small cleanup
codesome Jan 8, 2020
27c6f85
Merge remote-tracking branch 'upstream/master' into wal
codesome Jan 20, 2020
3fb38ca
Update config file and argument doc
codesome Jan 20, 2020
92c1149
Add guide to run/migrate-to WAL in ingesters
codesome Jan 20, 2020
30c71a1
Merge remote-tracking branch 'upstream/master' into wal
codesome Jan 20, 2020
8b55cdb
Fix review comments
codesome Jan 20, 2020
aaa2aa2
Merge remote-tracking branch 'upstream/master' into wal
codesome Jan 20, 2020
fe3cc08
Fix review comments
codesome Jan 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ $(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))

# Manually declared dependencies And what goes into each exe
pkg/ingester/client/cortex.pb.go: pkg/ingester/client/cortex.proto
pkg/ingester/wal.pb.go: pkg/ingester/wal.proto
pkg/ring/ring.pb.go: pkg/ring/ring.proto
pkg/querier/frontend/frontend.pb.go: pkg/querier/frontend/frontend.proto
pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto

all: $(UPTODATE_FILES)
test: protos
mod-check: protos
Expand Down
18 changes: 18 additions & 0 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,24 @@ It also talks to a KVStore and has it's own copies of the same flags used by the
Where you don't want to cache every chunk written by ingesters, but you do want to take advantage of chunk write deduplication, this option will make ingesters write a placeholder to the cache for each chunk.
Make sure you configure ingesters with a different cache to queriers, which need the whole value.

#### WAL

- `--ingester.wal-dir`
Directory where the WAL data should be stores and/or recovered from.

- `--ingester.wal-enabled`

Setting this to `true` enables writing to WAL during ingestion.

- `--ingester.checkpoint-enabled`
Set this to `true` to enable checkpointing of in-memory chunks to disk. This is optional which helps in speeding up the replay process.

- `--ingester.checkpoint-duration`
This is the interval at which checkpoints should be created.

- `--ingester.recover-from-wal`
Set this to to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this.

## Runtime Configuration file

Cortex has a concept of "runtime config" file, which is simply a file that is reloaded while Cortex is running. It is used by some Cortex components to allow operator to change some aspects of Cortex configuration without restarting it. File is specified by using `-runtime-config.file=<filename>` flag and reload period (which defaults to 10 seconds) can be changed by `-runtime-config.reload-period=<duration>` flag. Previously this mechanism was only used by limits overrides, and flags were called `-limits.per-user-override-config=<filename>` and `-limits.per-user-override-period=10s` respectively. These are still used, if `-runtime-config.file=<filename>` is not specified.
Expand Down
21 changes: 21 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,27 @@ ring:
The `ingester_config` configures the Cortex ingester.

```yaml
walconfig:
# Enable writing of ingested data into WAL.
# CLI flag: -ingester.wal-enabled
[wal_enabled: <boolean> | default = false]

# Enable checkpointing of in-memory chunks.
# CLI flag: -ingester.checkpoint-enabled
[checkpoint_enabled: <boolean> | default = false]

# Recover data from existing WAL irrespective of WAL enabled/disabled.
# CLI flag: -ingester.recover-from-wal
[recover_from_wal: <boolean> | default = false]

# Directory to store the WAL and/or recover from WAL.
# CLI flag: -ingester.wal-dir
[wal_dir: <string> | default = "wal"]

# Interval at which checkpoints should be created.
# CLI flag: -ingester.checkpoint-duration
[checkpoint_duration: <duration> | default = 30m0s]

lifecycler:
ring:
kvstore:
Expand Down
76 changes: 76 additions & 0 deletions docs/guides/ingesters-with-wal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
title: "Ingesters with WAL"
linkTitle: "Ingesters with WAL"
weight: 5
slug: ingesters-with-wal
---

Currently the ingesters running in the chunks storage mode, store all their data in memory. If there is a crash, there could be loss of data. WAL helps fill this gap in reliability.

To use WAL, there are some changes that needs to be made in the deployment.

## Changes to deployment

1. Since ingesters need to have the same persistent volume across restarts/rollout, all the ingesters should be run on [statefulset](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) with fixed volumes.

2. Following flags needs to be set
* `--ingester.wal-dir` to the directory where the WAL data should be stores and/or recovered from. Note that this should be on the mounted volume.
* `--ingester.wal-enabled` to `true` which enables writing to WAL during ingestion.
* `--ingester.checkpoint-enabled` to `true` to enable checkpointing of in-memory chunks to disk. This is optional which helps in speeding up the replay process.
* `--ingester.checkpoint-duration` to the interval at which checkpoints should be created. Default is `30m`, and depending on the number of series, it can be brought down to `15m` if there are less series per ingester (say 1M).
* `--ingester.recover-from-wal` to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this.
* If you are going to enable WAL, it is advisable to always set this to `true`.
* `--ingester.tokens-file-path` should be set to the filepath where the tokens should be stored. Note that this should be on the mounted volume. Why this is required is described below.

## Changes in lifecycle when WAL is enabled

1. Flushing of data to chunk store during rollouts or scale down is disabled. This is because during a rollout of statefulset there are no ingesters that are simultaneously leaving and joining, rather the same ingester is shut down and brought back again with updated config. Hence flushing is skipped and the data is recovered from the WAL.

2. As there are no transfers between ingesters, the tokens are stored and recovered from disk between rollout/restarts. This is [not a new thing](https://github.com/cortexproject/cortex/pull/1750) but it is effective when using statefulsets.

## Migrating from stateless deployments

The ingester _deployment without WAL_ and _statefulset with WAL_ should be scaled down and up respectively in sync without transfer of data between them to ensure that any ingestion after migration is reliable immediately.

Let's take an example of 4 ingesters. The migration would look something like this:

1. Bring up one stateful ingester `ingester-0` and wait till it's ready (accepting read and write requests).
2. Scale down old ingester deployment to 3 and wait till the leaving ingester flushes all the data to chunk store.
3. Once that ingester has disappeared from `kc get pods ...`, add another stateful ingester and wait till it's ready. This assures not transfer. Now you have `ingester-0 ingester-1`.
4. Repeat step 2 to reduce remove another ingester from old deployment.
5. Repeat step 3 to add another stateful ingester. Now you have `ingester-0 ingester-1 ingester-2`.
6. Repeat step 4 and 5, and now you will finally have `ingester-0 ingester-1 ingester-2 ingester-3`.

## How to scale up/down

### Scale up

Scaling up is same as what you would do without WAL or statefulsets. Nothing to change here.

### Scale down

Since Kubernetes doesn't differentiate between rollout and scale down when sending a signal, the flushing of chunks is disabled by default. Hence the only thing to take care during scale down is flushing of chunks.

There are 2 ways to do it, with the latter being a fallback option.

**First option**
Consider you have 4 ingesters `ingester-0 ingester-1 ingester-2 ingester-3` and you want to scale down to 2 ingesters, the ingesters which will be shutdown according to statefulset rules are `ingester-3` and then `ingester-2`.

Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/shutdown`](https://github.com/cortexproject/cortex/pull/1746) endpoint. This will flush the chunks and shut down the ingesters (while also removing itself from the ring).

After hitting the endpoint for `ingester-2 ingester-3`, scale down the ingesters to 2.

PS: Given you have to scale down 1 ingester at a time, you can pipeline the shutdown and scaledown process instead of hitting shutdown endpoint for all to-be-scaled-down ingesters at the same time.

**Fallback option**

There is a [flush mode ingester](https://github.com/cortexproject/cortex/pull/1747) in progress, and with recent discussions there will be a separate target called flusher in it's place.

You can run it as a kubernetes job which will
* Attach to the volume of the scaled down ingester
* Recover from the WAL
* And flush all the chunks.

This job is to be run for all the ingesters that you missed hitting the shutdown endpoint as a first option.

More info about the flusher target will be added once it's upstream.
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
if !canJoinDistributorsRing {
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey)
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true)
if err != nil {
return nil, err
}
Expand Down
99 changes: 91 additions & 8 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ const (
queryStreamBatchSize = 128
)

var (
// This is initialised if the WAL is enabled and the records are fetched from this pool.
recordPool sync.Pool
)

// Config for an Ingester.
type Config struct {
WALConfig WALConfig
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`

// Config for transferring chunks. Zero or negative = no retries.
Expand Down Expand Up @@ -70,6 +76,7 @@ type Config struct {
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlags(f)
cfg.WALConfig.RegisterFlags(f)

f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. Negative value or zero disables hand-over.")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
Expand Down Expand Up @@ -109,6 +116,9 @@ type Ingester struct {
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup

// This should never be nil.
wal WAL

// Hook for injecting behaviour from tests.
preFlushUserSeries func()

Expand All @@ -131,6 +141,19 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
return NewV2(cfg, clientConfig, limits, registerer)
}

if cfg.WALConfig.walEnabled {
// If WAL is enabled, we don't transfer out the data to any ingester.
// Either the next ingester which takes it's place should recover from WAL
// or the data has to be flushed during scaledown.
cfg.MaxTransferRetries = 0

recordPool = sync.Pool{
New: func() interface{} {
return &Record{}
},
}
}

i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
Expand All @@ -142,14 +165,36 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
}

var err error
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey)
// During WAL recovery, it will create new user states which requires the limiter.
// Hence initialise the limiter before creating the WAL.
// The '!cfg.WALConfig.walEnabled' argument says don't flush on shutdown if the WAL is enabled.
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.walEnabled)
if err != nil {
return nil, err
}

// Init the limter and instantiate the user states which depend on it
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
i.userStates = newUserStates(i.limiter, cfg, i.metrics)

if cfg.WALConfig.recover {
level.Info(util.Logger).Log("msg", "recovering from WAL")
start := time.Now()
if err := recoverFromWAL(i); err != nil {
level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String())
return nil, err
}
elapsed := time.Since(start)
level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also make this a metric? So that we can compare the duration changes over releases and also correlate it with the number of series, etc.

i.metrics.walReplayDuration.Set(elapsed.Seconds())
}

// If the WAL recover happened, then the userStates would already be set.
if i.userStates == nil {
i.userStates = newUserStates(i.limiter, cfg, i.metrics)
}

i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp)
if err != nil {
return nil, err
}

// Now that user states have been created, we can start the lifecycler
i.lifecycler.Start()
Expand Down Expand Up @@ -200,6 +245,8 @@ func (i *Ingester) Shutdown() {
close(i.quit)
i.done.Wait()

i.wal.Stop()

// Next initiate our graceful exit from the ring.
i.lifecycler.Shutdown()
}
Expand All @@ -209,7 +256,11 @@ func (i *Ingester) Shutdown() {
// * Change the state of ring to stop accepting writes.
// * Flush all the chunks.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
originalState := i.lifecycler.FlushOnShutdown()
// We want to flush the chunks if transfer fails irrespective of original flag.
i.lifecycler.SetFlushOnShutdown(true)
i.Shutdown()
i.lifecycler.SetFlushOnShutdown(originalState)
w.WriteHeader(http.StatusNoContent)
}

Expand All @@ -232,11 +283,25 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
if err != nil {
return nil, fmt.Errorf("no user id")
}

var lastPartialErr *validationError
var record *Record
if i.cfg.WALConfig.walEnabled {
record = recordPool.Get().(*Record)
record.UserId = userID
// Assuming there is not much churn in most cases, there is no use
// keeping the record.Labels slice hanging around.
record.Labels = nil
if cap(record.Samples) < len(req.Timeseries) {
record.Samples = make([]Sample, 0, len(req.Timeseries))
} else {
record.Samples = record.Samples[:0]
}
}

for _, ts := range req.Timeseries {
for _, s := range ts.Samples {
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source)
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
if err == nil {
continue
}
Expand All @@ -254,10 +319,19 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
if lastPartialErr != nil {
return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError()
}

if record != nil {
// Log the record only if there was no error in ingestion.
if err := i.wal.Log(record); err != nil {
return nil, err
}
recordPool.Put(record)
}

return &client.WriteResponse{}, nil
}

func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error {
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error {
labels.removeBlanks()

var (
Expand All @@ -274,7 +348,8 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
if i.stopped {
return fmt.Errorf("ingester stopping")
}
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels)

state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels, record)
if err != nil {
if ve, ok := err.(*validationError); ok {
state.discardedSamples.WithLabelValues(ve.errorType).Inc()
Expand Down Expand Up @@ -310,6 +385,14 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
return err
}

if record != nil {
record.Samples = append(record.Samples, Sample{
Fingerprint: uint64(fp),
Timestamp: uint64(timestamp),
Value: float64(value),
})
}

memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
i.metrics.ingestedSamples.Inc()
switch source {
Expand Down Expand Up @@ -430,7 +513,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
}

numSeries++
wireChunks, err := toWireChunks(chunks)
wireChunks, err := toWireChunks(chunks, nil)
if err != nil {
return err
}
Expand Down
Loading