Skip to content

Comment unsafe memory usage in ingester push path #2004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"net/http"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
)
Expand Down Expand Up @@ -50,11 +49,6 @@ func makeMetricLimitError(errorType string, labels labels.Labels, err error) err
}
}

func (e *validationError) WrapWithUser(userID string) *validationError {
e.err = wrapWithUser(e.err, userID)
return e
}

func (e *validationError) Error() string {
if e.err == nil {
return e.errorType
Expand All @@ -65,14 +59,15 @@ func (e *validationError) Error() string {
return fmt.Sprintf("%s for series %s", e.err.Error(), e.labels.String())
}

// WrappedError returns a HTTP gRPC error than is correctly forwarded over gRPC.
func (e *validationError) WrappedError() error {
// returns a HTTP gRPC error than is correctly forwarded over gRPC, with no reference to `e` retained.
func grpcForwardableError(userID string, code int, e error) error {
return httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{
Code: int32(e.code),
Body: []byte(e.Error()),
Code: int32(code),
Body: []byte(wrapWithUser(e, userID).Error()),
})
}

// Note: does not retain a reference to `err`
func wrapWithUser(err error, userID string) error {
return errors.Wrapf(err, "user=%s", userID)
return fmt.Errorf("user=%s: %s", userID, err)
}
8 changes: 6 additions & 2 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ func New() *InvertedIndex {
}

// Add a fingerprint under the specified labels.
// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (ii *InvertedIndex) Add(labels []client.LabelAdapter, fp model.Fingerprint) labels.Labels {
shard := &ii.shards[util.HashFP(fp)%indexShards]
return shard.add(labels, fp)
return shard.add(labels, fp) // add() returns 'interned' values so the original labels are not retained
}

// Lookup all fingerprints for the provided matchers.
Expand Down Expand Up @@ -108,7 +110,9 @@ func copyString(s string) string {
return string([]byte(s))
}

// add metric to the index; return all the name/value pairs as strings from the index, sorted
// add metric to the index; return all the name/value pairs as a fresh
// sorted slice, referencing 'interned' strings from the index so that
// no references are retained to the memory of `metric`.
func (shard *indexShard) add(metric []client.LabelAdapter, fp model.Fingerprint) labels.Labels {
shard.mtx.Lock()
defer shard.mtx.Unlock()
Expand Down
12 changes: 10 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
return i.v2Push(ctx, req)
}

// NOTE: because we use `unsafe` in deserialisation, we must not
// retain anything from `req` past the call to ReuseSlice
defer client.ReuseSlice(req.Timeseries)

userID, err := user.ExtractOrgID(ctx)
Expand All @@ -301,6 +303,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.

for _, ts := range req.Timeseries {
for _, s := range ts.Samples {
// append() copies the memory in `ts.Labels` except on the error path
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
if err == nil {
continue
Expand All @@ -312,12 +315,14 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
continue
}

return nil, wrapWithUser(err, userID)
// non-validation error: abandon this request
return nil, grpcForwardableError(userID, http.StatusInternalServerError, err)
}
}

if lastPartialErr != nil {
return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError()
// grpcForwardableError turns the error into a string so it no longer references `req`
return &client.WriteResponse{}, grpcForwardableError(userID, lastPartialErr.code, lastPartialErr)
}

if record != nil {
Expand All @@ -331,6 +336,8 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
return &client.WriteResponse{}, nil
}

// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error {
labels.removeBlanks()

Expand All @@ -349,6 +356,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
return fmt.Errorf("ingester stopping")
}

// getOrCreateSeries copies the memory for `labels`, except on the error path.
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels, record)
if err != nil {
if ve, ok := err.(*validationError); ok {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
var firstPartialErr error

// NOTE: because we use `unsafe` in deserialisation, we must not
// retain anything from `req` past the call to ReuseSlice
defer client.ReuseSlice(req.Timeseries)

userID, err := user.ExtractOrgID(ctx)
Expand Down
9 changes: 8 additions & 1 deletion pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,17 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro
return state, ok, nil
}

// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*userState, model.Fingerprint, *memorySeries, error) {
state := us.getOrCreate(userID)
// WARNING: `err` may have a reference to unsafe memory in `labels`
fp, series, err := state.getSeries(labels, record)
return state, fp, series, err
}

// NOTE: memory for `metric` is unsafe; anything retained beyond the
// life of this function must be copied
func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *memorySeries, error) {
rawFP := client.FastFingerprint(metric)
u.fpLocker.Lock(rawFP)
Expand Down Expand Up @@ -197,6 +202,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
}
}

// MetricNameFromLabelAdapters returns a copy of the string in `metric`
metricName, err := extract.MetricNameFromLabelAdapters(metric)
if err != nil {
return nil, err
Expand All @@ -205,6 +211,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
if !recovery {
// Check if the per-metric limit has been exceeded
if err = u.canAddSeriesFor(string(metricName)); err != nil {
// WARNING: returns a reference to `metric`
return nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err)
}
}
Expand All @@ -219,7 +226,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
})
}

labels := u.index.Add(metric, fp)
labels := u.index.Add(metric, fp) // Add() returns 'interned' values so the original labels are not retained
series := newMemorySeries(labels)
u.fpToSeries.put(fp, series)

Expand Down