Skip to content

Commit 4dd4ce6

Browse files
committed
Merge remote-tracking branch 'upstream/master' into wal
Signed-off-by: Ganesh Vernekar <[email protected]>
2 parents f2d0b29 + d57fe0f commit 4dd4ce6

File tree

12 files changed

+149
-61
lines changed

12 files changed

+149
-61
lines changed

pkg/distributor/distributor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur
754754
cfg.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond
755755
cfg.DistributorRing.InstanceID = strconv.Itoa(rand.Int())
756756
cfg.DistributorRing.KVStore.Mock = kvStore
757-
cfg.DistributorRing.InstanceInterfaceNames = []string{"eth0", "en0", "lo0"}
757+
cfg.DistributorRing.InstanceAddr = "127.0.0.1"
758758

759759
overrides, err := validation.NewOverrides(*limits)
760760
require.NoError(t, err)

pkg/ingester/errors.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package ingester
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
7+
"github.com/prometheus/prometheus/pkg/labels"
8+
"github.com/weaveworks/common/httpgrpc"
9+
)
10+
11+
type validationError struct {
12+
err error // underlying error
13+
errorType string
14+
code int
15+
noReport bool // if true, error will be counted but not reported to caller
16+
labels labels.Labels
17+
}
18+
19+
func makeLimitError(errorType string, err error) error {
20+
return &validationError{
21+
errorType: errorType,
22+
err: err,
23+
code: http.StatusTooManyRequests,
24+
}
25+
}
26+
27+
func makeNoReportError(errorType string) error {
28+
return &validationError{
29+
errorType: errorType,
30+
noReport: true,
31+
}
32+
}
33+
34+
func makeMetricValidationError(errorType string, labels labels.Labels, err error) error {
35+
return &validationError{
36+
errorType: errorType,
37+
err: err,
38+
code: http.StatusBadRequest,
39+
labels: labels,
40+
}
41+
}
42+
43+
func makeMetricLimitError(errorType string, labels labels.Labels, err error) error {
44+
return &validationError{
45+
errorType: errorType,
46+
err: err,
47+
code: http.StatusTooManyRequests,
48+
labels: labels,
49+
}
50+
}
51+
52+
func (e *validationError) Error() string {
53+
if e.err == nil {
54+
return e.errorType
55+
}
56+
if e.labels == nil {
57+
return e.err.Error()
58+
}
59+
return fmt.Sprintf("%s for series %s", e.err.Error(), e.labels.String())
60+
}
61+
62+
// WrappedError returns a HTTP gRPC error than is correctly forwarded over gRPC.
63+
func (e *validationError) WrappedError() error {
64+
return httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{
65+
Code: int32(e.code),
66+
Body: []byte(e.Error()),
67+
})
68+
}

pkg/ingester/ingester.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
343343
return nil, fmt.Errorf("no user id")
344344
}
345345

346+
var lastPartialErr *validationError
346347
var record *Record
347348
if i.cfg.WALConfig.walEnabled {
348349
record = recordPool.Get().(*Record)
@@ -357,7 +358,6 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
357358
}
358359
}
359360

360-
var lastPartialErr error
361361
for _, ts := range req.Timeseries {
362362
for _, s := range ts.Samples {
363363
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)
@@ -366,27 +366,29 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
366366
}
367367

368368
i.metrics.ingestedSamplesFail.Inc()
369-
if httpResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
370-
switch httpResp.Code {
371-
case http.StatusBadRequest, http.StatusTooManyRequests:
372-
lastPartialErr = err
373-
continue
374-
}
369+
if ve, ok := err.(*validationError); ok {
370+
lastPartialErr = ve
371+
continue
375372
}
376373

377374
return nil, err
378375
}
379376
}
380377
client.ReuseSlice(req.Timeseries)
381378

379+
if lastPartialErr != nil {
380+
return &client.WriteResponse{}, lastPartialErr.WrappedError()
381+
}
382+
382383
if record != nil {
384+
// Log the record only if there was no error in ingestion.
383385
if err := i.wal.Log(record); err != nil {
384386
return nil, err
385387
}
386388
recordPool.Put(record)
387389
}
388390

389-
return &client.WriteResponse{}, lastPartialErr
391+
return &client.WriteResponse{}, nil
390392
}
391393

392394
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error {
@@ -409,6 +411,9 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
409411

410412
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels, record)
411413
if err != nil {
414+
if ve, ok := err.(*validationError); ok {
415+
state.discardedSamples.WithLabelValues(ve.errorType).Inc()
416+
}
412417
state = nil // don't want to unlock the fp if there is an error
413418
return err
414419
}
@@ -428,13 +433,11 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
428433
Value: value,
429434
Timestamp: timestamp,
430435
}); err != nil {
431-
if mse, ok := err.(*memorySeriesError); ok {
432-
state.discardedSamples.WithLabelValues(mse.errorType).Inc()
433-
if mse.noReport {
436+
if ve, ok := err.(*validationError); ok {
437+
state.discardedSamples.WithLabelValues(ve.errorType).Inc()
438+
if ve.noReport {
434439
return nil
435440
}
436-
// Use a dumb string template to avoid the message being parsed as a template
437-
err = httpgrpc.Errorf(http.StatusBadRequest, "%s", mse.message)
438441
}
439442
return err
440443
}

pkg/ingester/ingester_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -320,16 +320,16 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) {
320320
// Earlier sample than previous one.
321321
err = ing.append(ctx, userID, m, 0, 0, client.API, nil)
322322
require.Contains(t, err.Error(), "sample timestamp out of order")
323-
errResp, ok := httpgrpc.HTTPResponseFromError(err)
323+
errResp, ok := err.(*validationError)
324324
require.True(t, ok)
325-
require.Equal(t, errResp.Code, int32(400))
325+
require.Equal(t, errResp.code, 400)
326326

327327
// Same timestamp as previous sample, but different value.
328328
err = ing.append(ctx, userID, m, 1, 1, client.API, nil)
329329
require.Contains(t, err.Error(), "sample with repeated timestamp but different value")
330-
errResp, ok = httpgrpc.HTTPResponseFromError(err)
330+
errResp, ok = err.(*validationError)
331331
require.True(t, ok)
332-
require.Equal(t, errResp.Code, int32(400))
332+
require.Equal(t, errResp.code, 400)
333333
}
334334

335335
// Test that blank labels are removed by the ingester
@@ -526,9 +526,19 @@ func benchmarkIngesterSeriesCreationLocking(b *testing.B, parallelism int) {
526526
}
527527

528528
func BenchmarkIngesterPush(b *testing.B) {
529+
limits := defaultLimitsTestConfig()
530+
benchmarkIngesterPush(b, limits, false)
531+
}
532+
533+
func BenchmarkIngesterPushErrors(b *testing.B) {
534+
limits := defaultLimitsTestConfig()
535+
limits.MaxLocalSeriesPerMetric = 1
536+
benchmarkIngesterPush(b, limits, true)
537+
}
538+
539+
func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) {
529540
cfg := defaultIngesterTestConfig()
530541
clientCfg := defaultClientTestConfig()
531-
limits := defaultLimitsTestConfig()
532542

533543
const (
534544
series = 100
@@ -570,7 +580,9 @@ func BenchmarkIngesterPush(b *testing.B) {
570580
allSamples[i].TimestampMs = int64(j + 1)
571581
}
572582
_, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, client.API))
573-
require.NoError(b, err)
583+
if !errorsExpected {
584+
require.NoError(b, err)
585+
}
574586
}
575587
ing.Shutdown()
576588
}

pkg/ingester/ingester_v2.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
135135
// 400 error to the client. The client (Prometheus) will not retry on 400, and
136136
// we actually ingested all samples which haven't failed.
137137
if err == tsdb.ErrOutOfBounds || err == tsdb.ErrOutOfOrderSample || err == tsdb.ErrAmendSample {
138-
lastPartialErr = httpgrpc.Errorf(http.StatusBadRequest, err.Error())
138+
lastPartialErr = err
139139
continue
140140
}
141141

@@ -159,7 +159,10 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
159159

160160
client.ReuseSlice(req.Timeseries)
161161

162-
return &client.WriteResponse{}, lastPartialErr
162+
if lastPartialErr != nil {
163+
return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusBadRequest, lastPartialErr.Error())
164+
}
165+
return &client.WriteResponse{}, nil
163166
}
164167

165168
func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*client.QueryResponse, error) {

pkg/ingester/series.go

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,6 @@ type memorySeries struct {
4242
lastSampleValue model.SampleValue
4343
}
4444

45-
type memorySeriesError struct {
46-
message string
47-
errorType string
48-
noReport bool // if true, error will be counted but not reported to caller
49-
}
50-
51-
func (error *memorySeriesError) Error() string {
52-
if error.message == "" {
53-
return error.errorType
54-
}
55-
return error.message
56-
}
57-
5845
// newMemorySeries returns a pointer to a newly allocated memorySeries for the
5946
// given metric.
6047
func newMemorySeries(m labels.Labels) *memorySeries {
@@ -77,24 +64,20 @@ func (s *memorySeries) add(v model.SamplePair) error {
7764
// If we don't know what the last sample value is, silently discard.
7865
// This will mask some errors but better than complaining when we don't really know.
7966
if !s.lastSampleValueSet {
80-
return &memorySeriesError{errorType: "duplicate-timestamp", noReport: true}
67+
return makeNoReportError("duplicate-timestamp")
8168
}
8269
// If both timestamp and sample value are the same as for the last append,
8370
// ignore as they are a common occurrence when using client-side timestamps
8471
// (e.g. Pushgateway or federation).
8572
if v.Value.Equal(s.lastSampleValue) {
86-
return &memorySeriesError{errorType: "duplicate-sample", noReport: true}
87-
}
88-
return &memorySeriesError{
89-
message: fmt.Sprintf("sample with repeated timestamp but different value for series %v; last value: %v, incoming value: %v", s.metric, s.lastSampleValue, v.Value),
90-
errorType: "new-value-for-timestamp",
73+
return makeNoReportError("duplicate-sample")
9174
}
75+
return makeMetricValidationError("new-value-for-timestamp", s.metric,
76+
fmt.Errorf("sample with repeated timestamp but different value; last value: %v, incoming value: %v", s.lastSampleValue, v.Value))
9277
}
9378
if v.Timestamp < s.lastTime {
94-
return &memorySeriesError{
95-
message: fmt.Sprintf("sample timestamp out of order for series %v; last timestamp: %v, incoming timestamp: %v", s.metric, s.lastTime, v.Timestamp),
96-
errorType: "sample-out-of-order",
97-
}
79+
return makeMetricValidationError("sample-out-of-order", s.metric,
80+
fmt.Errorf("sample timestamp out of order; last timestamp: %v, incoming timestamp: %v", s.lastTime, v.Timestamp))
9881
}
9982

10083
if len(s.chunkDescs) == 0 || s.headChunkClosed {

pkg/ingester/transfer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) {
401401
// Called as part of the ingester shutdown process.
402402
func (i *Ingester) TransferOut(ctx context.Context) error {
403403
if i.cfg.MaxTransferRetries <= 0 {
404-
return fmt.Errorf("transfers disabled")
404+
return ring.ErrTransferDisabled
405405
}
406406
backoff := util.NewBackoff(ctx, util.BackoffConfig{
407407
MinBackoff: 100 * time.Millisecond,

pkg/ingester/user_state.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
209209

210210
if !recovery {
211211
if err := u.limiter.AssertMaxSeriesPerUser(u.userID, u.fpToSeries.length()); err != nil {
212-
u.discardedSamples.WithLabelValues(perUserSeriesLimit).Inc()
213-
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
212+
return nil, makeLimitError(perUserSeriesLimit, err)
214213
}
215214
}
216215

@@ -222,8 +221,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab
222221
if !recovery {
223222
// Check if the per-metric limit has been exceeded
224223
if err = u.canAddSeriesFor(string(metricName)); err != nil {
225-
u.discardedSamples.WithLabelValues(perMetricSeriesLimit).Inc()
226-
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s for: %s", err.Error(), metric)
224+
return nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err)
227225
}
228226
}
229227

pkg/querier/queryrange/roundtrip.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,13 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, codec Codec, cach
119119
return frontend.Tripperware(func(next http.RoundTripper) http.RoundTripper {
120120
// Finally, if the user selected any query range middleware, stitch it in.
121121
if len(queryRangeMiddleware) > 0 {
122-
return NewRoundTripper(next, codec, queryRangeMiddleware...)
122+
queryrange := NewRoundTripper(next, codec, queryRangeMiddleware...)
123+
return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
124+
if !strings.HasSuffix(r.URL.Path, "/query_range") {
125+
return next.RoundTrip(r)
126+
}
127+
return queryrange.RoundTrip(r)
128+
})
123129
}
124130
return next
125131
}), c, nil
@@ -143,9 +149,6 @@ func NewRoundTripper(next http.RoundTripper, codec Codec, middlewares ...Middlew
143149
}
144150

145151
func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
146-
if !strings.HasSuffix(r.URL.Path, "/query_range") {
147-
return q.next.RoundTrip(r)
148-
}
149152

150153
request, err := q.codec.DecodeRequest(r.Context(), r)
151154
if err != nil {

pkg/querier/queryrange/roundtrip_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strconv"
1010
"testing"
1111

12+
"github.com/cortexproject/cortex/pkg/util"
1213
"github.com/stretchr/testify/require"
1314
"github.com/weaveworks/common/middleware"
1415
"github.com/weaveworks/common/user"
@@ -18,10 +19,14 @@ func TestRoundTrip(t *testing.T) {
1819
s := httptest.NewServer(
1920
middleware.AuthenticateUser.Wrap(
2021
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
22+
var err error
2123
if r.RequestURI == query {
22-
w.Write([]byte(responseBody))
24+
_, err = w.Write([]byte(responseBody))
2325
} else {
24-
w.Write([]byte("bar"))
26+
_, err = w.Write([]byte("bar"))
27+
}
28+
if err != nil {
29+
t.Fatal(err)
2530
}
2631
}),
2732
),
@@ -36,7 +41,10 @@ func TestRoundTrip(t *testing.T) {
3641
next: http.DefaultTransport,
3742
}
3843

39-
roundtripper := NewRoundTripper(downstream, PrometheusCodec, LimitsMiddleware(fakeLimits{}))
44+
tw, _, err := NewTripperware(Config{}, util.Logger, fakeLimits{}, PrometheusCodec, nil)
45+
if err != nil {
46+
t.Fatal(err)
47+
}
4048

4149
for i, tc := range []struct {
4250
path, expectedBody string
@@ -56,7 +64,7 @@ func TestRoundTrip(t *testing.T) {
5664
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
5765
require.NoError(t, err)
5866

59-
resp, err := roundtripper.RoundTrip(req)
67+
resp, err := tw(downstream).RoundTrip(req)
6068
require.NoError(t, err)
6169
require.Equal(t, 200, resp.StatusCode)
6270

pkg/ring/flush.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
package ring
22

3-
import "context"
3+
import (
4+
"context"
5+
"errors"
6+
)
7+
8+
// ErrTransferDisabled is the error returned by TransferOut when the transfers are disabled.
9+
var ErrTransferDisabled = errors.New("transfers disabled")
410

511
// FlushTransferer controls the shutdown of an instance in the ring.
612
type FlushTransferer interface {

0 commit comments

Comments
 (0)