Skip to content

Commit cfc25fd

Browse files
committed
Comment unsafe memory usage in ingester push path
Signed-off-by: Bryan Boreham <[email protected]>
1 parent ab3e836 commit cfc25fd

File tree

3 files changed

+22
-3
lines changed

3 files changed

+22
-3
lines changed

pkg/ingester/index/index.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ func New() *InvertedIndex {
3232
}
3333

3434
// Add a fingerprint under the specified labels.
35+
// NOTE: memory for `labels` is unsafe; anything retained beyond the
36+
// life of this function must be copied
3537
func (ii *InvertedIndex) Add(labels []client.LabelAdapter, fp model.Fingerprint) labels.Labels {
3638
shard := &ii.shards[util.HashFP(fp)%indexShards]
37-
return shard.add(labels, fp)
39+
return shard.add(labels, fp) // add() returns 'interned' values so the original labels are not retained
3840
}
3941

4042
// Lookup all fingerprints for the provided matchers.
@@ -108,7 +110,9 @@ func copyString(s string) string {
108110
return string([]byte(s))
109111
}
110112

111-
// add metric to the index; return all the name/value pairs as strings from the index, sorted
113+
// add metric to the index; return all the name/value pairs as a fresh
114+
// sorted slice, referencing 'interned' strings from the index so that
115+
// no references are retained to the memory of `metric`.
112116
func (shard *indexShard) add(metric []client.LabelAdapter, fp model.Fingerprint) labels.Labels {
113117
shard.mtx.Lock()
114118
defer shard.mtx.Unlock()

pkg/ingester/ingester.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,11 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
299299
}
300300
}
301301

302+
// NOTE: because we use `unsafe` in deserialisation, we must not
303+
// retain anything from `req` past the call to ReuseSlice
302304
for _, ts := range req.Timeseries {
303305
for _, s := range ts.Samples {
306+
// append() copies the memory in `ts.Labels` except on the error path
304307
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
305308
if err == nil {
306309
continue
@@ -312,11 +315,13 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
312315
continue
313316
}
314317

318+
// non-validation error: abandon this request
315319
return nil, wrapWithUser(err, userID)
316320
}
317321
}
318322

319323
if lastPartialErr != nil {
324+
// WrappedError turns the error into a string so it no longer references `req`
320325
return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError()
321326
}
322327

@@ -331,6 +336,8 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
331336
return &client.WriteResponse{}, nil
332337
}
333338

339+
// NOTE: memory for `labels` is unsafe; anything retained beyond the
340+
// life of this function must be copied
334341
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error {
335342
labels.removeBlanks()
336343

@@ -349,6 +356,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
349356
return fmt.Errorf("ingester stopping")
350357
}
351358

359+
// getOrCreateSeries copies the memory for `labels`, except on the error path.
352360
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels, record)
353361
if err != nil {
354362
if ve, ok := err.(*validationError); ok {

pkg/ingester/user_state.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,17 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro
155155
return state, ok, nil
156156
}
157157

158+
// NOTE: memory for `labels` is unsafe; anything retained beyond the
159+
// life of this function must be copied
158160
func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*userState, model.Fingerprint, *memorySeries, error) {
159161
state := us.getOrCreate(userID)
162+
// WARNING: `err` may have a reference to unsafe memory in `labels`
160163
fp, series, err := state.getSeries(labels, record)
161164
return state, fp, series, err
162165
}
163166

167+
// NOTE: memory for `metric` is unsafe; anything retained beyond the
168+
// life of this function must be copied
164169
func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *memorySeries, error) {
165170
rawFP := client.FastFingerprint(metric)
166171
u.fpLocker.Lock(rawFP)
@@ -197,6 +202,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
197202
}
198203
}
199204

205+
// MetricNameFromLabelAdapters returns a copy of the string in `metric`
200206
metricName, err := extract.MetricNameFromLabelAdapters(metric)
201207
if err != nil {
202208
return nil, err
@@ -205,6 +211,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
205211
if !recovery {
206212
// Check if the per-metric limit has been exceeded
207213
if err = u.canAddSeriesFor(string(metricName)); err != nil {
214+
// WARNING: returns a reference to `metric`
208215
return nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err)
209216
}
210217
}
@@ -219,7 +226,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
219226
})
220227
}
221228

222-
labels := u.index.Add(metric, fp)
229+
labels := u.index.Add(metric, fp) // Add() returns 'interned' values so the original labels are not retained
223230
series := newMemorySeries(labels)
224231
u.fpToSeries.put(fp, series)
225232

0 commit comments

Comments
 (0)