Skip to content

Commit 831080f

Browse files
committed
Use TSDB's WAL for writes.
Signed-off-by: Tom Wilkie <[email protected]>
1 parent 7228c1c commit 831080f

File tree

10 files changed

+507
-58
lines changed

10 files changed

+507
-58
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ cmd/test-exporter/test-exporter
1212
.pkg
1313
.cache
1414
pkg/ingester/client/cortex.pb.go
15+
pkg/ingester/wal.pb.go
1516
pkg/querier/frontend/frontend.pb.go
1617
pkg/ring/ring.pb.go
1718
pkg/chunk/storage/caching_storage_client.pb.go

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ $(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))
4747

4848
# Manually declared dependancies And what goes into each exe
4949
pkg/ingester/client/cortex.pb.go: pkg/ingester/client/cortex.proto
50+
pkg/ingester/wal.pb.go: pkg/ingester/wal.proto
5051
pkg/ring/ring.pb.go: pkg/ring/ring.proto
52+
pkg/querier/frontend/frontend.pb.go: pkg/querier/frontend/frontend.proto
5153
all: $(UPTODATE_FILES)
5254
test: $(PROTO_GOS)
5355
protos: $(PROTO_GOS)
@@ -113,7 +115,7 @@ $(EXES):
113115
$(NETGO_CHECK)
114116

115117
%.pb.go:
116-
protoc -I ./vendor:./$(@D) --gogoslick_out=plugins=grpc:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)
118+
protoc -I $(GOPATH)/src:./vendor:./$(@D) --gogoslick_out=plugins=grpc:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)
117119

118120
lint:
119121
./tools/lint -notestpackage -ignorespelling queriers -ignorespelling Queriers .

pkg/ingester/ingester.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ var (
7575
// Config for an Ingester.
7676
type Config struct {
7777
LifecyclerConfig ring.LifecyclerConfig
78+
WALConfig WALConfig
7879

7980
// Config for transferring chunks.
8081
SearchPendingFor time.Duration
@@ -98,6 +99,7 @@ type Config struct {
9899
// RegisterFlags adds the flags required to config this to the given FlagSet
99100
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
100101
cfg.LifecyclerConfig.RegisterFlags(f)
102+
cfg.WALConfig.RegisterFlags(f)
101103

102104
f.DurationVar(&cfg.SearchPendingFor, "ingester.search-pending-for", 30*time.Second, "Time to spend searching for a pending ingester when shutting down.")
103105
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
@@ -141,6 +143,8 @@ type Ingester struct {
141143
flushQueues []*util.PriorityQueue
142144
flushQueuesDone sync.WaitGroup
143145

146+
wal WAL
147+
144148
// Hook for injecting behaviour from tests.
145149
preFlushUserSeries func()
146150
}
@@ -173,6 +177,11 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
173177
}
174178

175179
var err error
180+
i.wal, err = newWAL(cfg.WALConfig, i)
181+
if err != nil {
182+
return nil, err
183+
}
184+
176185
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i)
177186
if err != nil {
178187
return nil, err
@@ -221,6 +230,8 @@ func (i *Ingester) Shutdown() {
221230
close(i.quit)
222231
i.done.Wait()
223232

233+
i.wal.Stop()
234+
224235
// Next initiate our graceful exit from the ring.
225236
i.lifecycler.Shutdown()
226237
}
@@ -234,11 +245,20 @@ func (i *Ingester) StopIncomingRequests() {
234245

235246
// Push implements client.IngesterServer
236247
func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
237-
var lastPartialErr error
248+
userID, err := user.ExtractOrgID(ctx)
249+
if err != nil {
250+
return nil, err
251+
}
252+
253+
record := Record{
254+
UserId: userID,
255+
Samples: make([]Sample, 0, len(req.Timeseries)),
256+
}
238257

258+
var lastPartialErr error
239259
for _, ts := range req.Timeseries {
240260
for _, s := range ts.Samples {
241-
err := i.append(ctx, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source)
261+
err := i.append(ctx, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, &record)
242262
if err == nil {
243263
continue
244264
}
@@ -256,10 +276,14 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
256276
}
257277
}
258278

279+
if err := i.wal.Log(&record); err != nil {
280+
return nil, err
281+
}
282+
259283
return &client.WriteResponse{}, lastPartialErr
260284
}
261285

262-
func (i *Ingester) append(ctx context.Context, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error {
286+
func (i *Ingester) append(ctx context.Context, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error {
263287
labels.removeBlanks()
264288

265289
i.stopLock.RLock()
@@ -270,7 +294,7 @@ func (i *Ingester) append(ctx context.Context, labels labelPairs, timestamp mode
270294

271295
i.userStatesMtx.RLock()
272296
defer i.userStatesMtx.RUnlock()
273-
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, labels)
297+
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, labels, record)
274298
if err != nil {
275299
return err
276300
}
@@ -291,6 +315,12 @@ func (i *Ingester) append(ctx context.Context, labels labelPairs, timestamp mode
291315
return err
292316
}
293317

318+
record.Samples = append(record.Samples, Sample{
319+
Fingerprint: int64(fp),
320+
Timestamp: int64(timestamp),
321+
Value: float64(value),
322+
})
323+
294324
memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
295325
ingestedSamples.Inc()
296326
switch source {

pkg/ingester/ingester_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,7 @@ func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries in
159159
return userIDs, testData
160160
}
161161

162-
func TestIngesterAppend(t *testing.T) {
163-
store, ing := newDefaultTestStore(t)
164-
165-
userIDs, testData := pushTestSamples(t, ing, 10, 1000)
166-
162+
func retrieveTestSamples(t *testing.T, ing *Ingester, userIDs []string, testData map[string]model.Matrix) {
167163
// Read samples back via ingester queries.
168164
for _, userID := range userIDs {
169165
ctx := user.InjectOrgID(context.Background(), userID)
@@ -181,6 +177,12 @@ func TestIngesterAppend(t *testing.T) {
181177
require.NoError(t, err)
182178
assert.Equal(t, testData[userID].String(), res.String())
183179
}
180+
}
181+
182+
func TestIngesterAppend(t *testing.T) {
183+
store, ing := newDefaultTestStore(t)
184+
userIDs, testData := pushTestSamples(t, ing, 10, 1000)
185+
retrieveTestSamples(t, ing, userIDs, testData)
184186

185187
// Read samples back via chunk store.
186188
ing.Shutdown()
@@ -245,22 +247,22 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) {
245247
{Name: []byte(model.MetricNameLabel), Value: []byte("testmetric")},
246248
}
247249
ctx := user.InjectOrgID(context.Background(), userID)
248-
err := ing.append(ctx, m, 1, 0, client.API)
250+
err := ing.append(ctx, m, 1, 0, client.API, &Record{})
249251
require.NoError(t, err)
250252

251253
// Two times exactly the same sample (noop).
252-
err = ing.append(ctx, m, 1, 0, client.API)
254+
err = ing.append(ctx, m, 1, 0, client.API, &Record{})
253255
require.NoError(t, err)
254256

255257
// Earlier sample than previous one.
256-
err = ing.append(ctx, m, 0, 0, client.API)
258+
err = ing.append(ctx, m, 0, 0, client.API, &Record{})
257259
require.Contains(t, err.Error(), "sample timestamp out of order")
258260
errResp, ok := httpgrpc.HTTPResponseFromError(err)
259261
require.True(t, ok)
260262
require.Equal(t, errResp.Code, int32(400))
261263

262264
// Same timestamp as previous sample, but different value.
263-
err = ing.append(ctx, m, 1, 1, client.API)
265+
err = ing.append(ctx, m, 1, 1, client.API, &Record{})
264266
require.Contains(t, err.Error(), "sample with repeated timestamp but different value")
265267
errResp, ok = httpgrpc.HTTPResponseFromError(err)
266268
require.True(t, ok)
@@ -278,7 +280,7 @@ func TestIngesterAppendBlankLabel(t *testing.T) {
278280
{Name: []byte("bar"), Value: []byte("")},
279281
}
280282
ctx := user.InjectOrgID(context.Background(), userID)
281-
err := ing.append(ctx, lp, 1, 0, client.API)
283+
err := ing.append(ctx, lp, 1, 0, client.API, &Record{})
282284
require.NoError(t, err)
283285

284286
res, _, err := runTestQuery(ctx, t, ing, labels.MatchEqual, model.MetricNameLabel, "testmetric")

pkg/ingester/series.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ func (s *memorySeries) setChunks(descs []*desc) error {
191191
if len(descs) > 0 {
192192
s.lastTime = descs[len(descs)-1].LastTime
193193
}
194+
memoryChunks.Add(float64(len(descs)))
195+
194196
return nil
195197
}
196198

pkg/ingester/transfer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,17 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
8585
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
8686
}
8787
userCtx := user.InjectOrgID(stream.Context(), wireSeries.UserId)
88+
8889
descs, err := fromWireChunks(wireSeries.Chunks)
8990
if err != nil {
9091
return err
9192
}
9293

93-
state, fp, series, err := userStates.getOrCreateSeries(userCtx, wireSeries.Labels)
94+
var record Record // for the WAL, not used.
95+
state, fp, series, err := userStates.getOrCreateSeries(userCtx, wireSeries.Labels, &record)
9496
if err != nil {
9597
return err
9698
}
97-
prevNumChunks := len(series.chunkDescs)
9899

99100
err = series.setChunks(descs)
100101
state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries
@@ -103,7 +104,6 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
103104
}
104105

105106
seriesReceived++
106-
memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
107107
receivedChunks.Add(float64(len(descs)))
108108
}
109109

pkg/ingester/user_state.go

Lines changed: 62 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -120,51 +120,61 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro
120120
return state, ok, nil
121121
}
122122

123-
func (us *userStates) getOrCreateSeries(ctx context.Context, labels labelPairs) (*userState, model.Fingerprint, *memorySeries, error) {
123+
func (us *userStates) getOrCreate(ctx context.Context) (*userState, error) {
124124
userID, err := user.ExtractOrgID(ctx)
125125
if err != nil {
126-
return nil, 0, nil, fmt.Errorf("no user id")
126+
return nil, fmt.Errorf("no user id")
127127
}
128128

129129
state, ok := us.get(userID)
130+
if ok {
131+
return state, nil
132+
}
133+
134+
seriesInMetric := make([]metricCounterShard, 0, metricCounterShards)
135+
for i := 0; i < metricCounterShards; i++ {
136+
seriesInMetric = append(seriesInMetric, metricCounterShard{
137+
m: map[string]int{},
138+
})
139+
}
140+
141+
// Speculatively create a userState object and try to store it
142+
// in the map. Another goroutine may have got there before
143+
// us, in which case this userState will be discarded
144+
state = &userState{
145+
userID: userID,
146+
limits: us.limits,
147+
fpToSeries: newSeriesMap(),
148+
fpLocker: newFingerprintLocker(16 * 1024),
149+
index: newInvertedIndex(),
150+
ingestedAPISamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
151+
ingestedRuleSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
152+
seriesInMetric: seriesInMetric,
153+
154+
memSeriesCreatedTotal: memSeriesCreatedTotal.WithLabelValues(userID),
155+
memSeriesRemovedTotal: memSeriesRemovedTotal.WithLabelValues(userID),
156+
}
157+
state.mapper = newFPMapper(state.fpToSeries)
158+
stored, ok := us.states.LoadOrStore(userID, state)
130159
if !ok {
160+
memUsers.Inc()
161+
}
162+
state = stored.(*userState)
131163

132-
seriesInMetric := make([]metricCounterShard, 0, metricCounterShards)
133-
for i := 0; i < metricCounterShards; i++ {
134-
seriesInMetric = append(seriesInMetric, metricCounterShard{
135-
m: map[string]int{},
136-
})
137-
}
164+
return state, nil
165+
}
138166

139-
// Speculatively create a userState object and try to store it
140-
// in the map. Another goroutine may have got there before
141-
// us, in which case this userState will be discarded
142-
state = &userState{
143-
userID: userID,
144-
limits: us.limits,
145-
fpToSeries: newSeriesMap(),
146-
fpLocker: newFingerprintLocker(16 * 1024),
147-
index: newInvertedIndex(),
148-
ingestedAPISamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
149-
ingestedRuleSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
150-
seriesInMetric: seriesInMetric,
151-
152-
memSeriesCreatedTotal: memSeriesCreatedTotal.WithLabelValues(userID),
153-
memSeriesRemovedTotal: memSeriesRemovedTotal.WithLabelValues(userID),
154-
}
155-
state.mapper = newFPMapper(state.fpToSeries)
156-
stored, ok := us.states.LoadOrStore(userID, state)
157-
if !ok {
158-
memUsers.Inc()
159-
}
160-
state = stored.(*userState)
167+
func (us *userStates) getOrCreateSeries(ctx context.Context, labels labelPairs, record *Record) (*userState, model.Fingerprint, *memorySeries, error) {
168+
state, err := us.getOrCreate(ctx)
169+
if err != nil {
170+
return nil, 0, nil, err
161171
}
162172

163-
fp, series, err := state.getSeries(labels)
173+
fp, series, err := state.getSeries(labels, record)
164174
return state, fp, series, err
165175
}
166176

167-
func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeries, error) {
177+
func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *memorySeries, error) {
168178
rawFP := client.FastFingerprint(metric)
169179
u.fpLocker.Lock(rawFP)
170180
fp := u.mapper.mapFP(rawFP, metric)
@@ -178,36 +188,48 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri
178188
return fp, series, nil
179189
}
180190

191+
series, err := u.createSeriesWithFingerprint(fp, metric, record)
192+
if err != nil {
193+
u.fpLocker.Unlock(fp)
194+
return 0, nil, err
195+
}
196+
197+
return fp, series, nil
198+
}
199+
200+
func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric labelPairs, record *Record) (*memorySeries, error) {
181201
// There's theoretically a relatively harmless race here if multiple
182202
// goroutines get the length of the series map at the same time, then
183203
// all proceed to add a new series. This is likely not worth addressing,
184204
// as this should happen rarely (all samples from one push are added
185205
// serially), and the overshoot in allowed series would be minimal.
186206
if u.fpToSeries.length() >= u.limits.MaxSeriesPerUser(u.userID) {
187-
u.fpLocker.Unlock(fp)
188-
return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user series limit (%d) exceeded", u.limits.MaxSeriesPerUser(u.userID))
207+
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user series limit (%d) exceeded", u.limits.MaxSeriesPerUser(u.userID))
189208
}
190209

191210
metricName, err := extract.MetricNameFromLabelPairs(metric)
192211
if err != nil {
193-
u.fpLocker.Unlock(fp)
194-
return fp, nil, err
212+
return nil, err
195213
}
196214

197215
if !u.canAddSeriesFor(string(metricName)) {
198-
u.fpLocker.Unlock(fp)
199-
return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-metric series limit (%d) exceeded for %s: %s", u.limits.MaxSeriesPerMetric(u.userID), metricName, metric)
216+
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-metric series limit (%d) exceeded for %s: %s", u.limits.MaxSeriesPerMetric(u.userID), metricName, metric)
200217
}
201218

202219
util.Event().Log("msg", "new series", "userID", u.userID, "fp", fp, "series", metric)
203220
u.memSeriesCreatedTotal.Inc()
204221
memSeries.Inc()
205222

206-
series = newMemorySeries(metric)
223+
record.Labels = append(record.Labels, Labels{
224+
Fingerprint: int64(fp),
225+
Labels: metric,
226+
})
227+
228+
series := newMemorySeries(metric)
207229
u.fpToSeries.put(fp, series)
208230
u.index.add(metric, fp)
209231

210-
return fp, series, nil
232+
return series, nil
211233
}
212234

213235
func (u *userState) canAddSeriesFor(metric string) bool {

0 commit comments

Comments
 (0)