Skip to content

Commit 21a0556

Browse files
authored
Merge pull request #1385 from gouthamve/dont-send-empty-series
Make sure only series with chunks are sent
2 parents 4cf7b9f + d7d4ddf commit 21a0556

File tree

4 files changed

+61
-2
lines changed

4 files changed

+61
-2
lines changed

pkg/ingester/ingester.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,14 +382,18 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
382382
// that would involve locking all the series & sorting, so until we have
383383
// a better solution in the ingesters I'd rather take the hit in the queriers.
384384
err = state.forSeriesMatching(stream.Context(), matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error {
385-
numSeries++
386385
chunks := make([]*desc, 0, len(series.chunkDescs))
387386
for _, chunk := range series.chunkDescs {
388387
if !(chunk.FirstTime.After(through) || chunk.LastTime.Before(from)) {
389388
chunks = append(chunks, chunk.slice(from, through))
390389
}
391390
}
392391

392+
if len(chunks) == 0 {
393+
return nil
394+
}
395+
396+
numSeries++
393397
wireChunks, err := toWireChunks(chunks)
394398
if err != nil {
395399
return err

pkg/ingester/ingester_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,15 @@ func matrixToSamples(m model.Matrix) []model.Sample {
123123
}
124124

125125
func runTestQuery(ctx context.Context, t *testing.T, ing *Ingester, ty labels.MatchType, n, v string) (model.Matrix, *client.QueryRequest, error) {
126+
return runTestQueryTimes(ctx, t, ing, ty, n, v, model.Earliest, model.Latest)
127+
}
128+
129+
func runTestQueryTimes(ctx context.Context, t *testing.T, ing *Ingester, ty labels.MatchType, n, v string, start, end model.Time) (model.Matrix, *client.QueryRequest, error) {
126130
matcher, err := labels.NewMatcher(ty, n, v)
127131
if err != nil {
128132
return nil, nil, err
129133
}
130-
req, err := client.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher})
134+
req, err := client.ToQueryRequest(start, end, []*labels.Matcher{matcher})
131135
if err != nil {
132136
return nil, nil, err
133137
}
@@ -187,6 +191,31 @@ func TestIngesterAppend(t *testing.T) {
187191
store.checkData(t, userIDs, testData)
188192
}
189193

194+
func TestIngesterSendsOnlySeriesWithData(t *testing.T) {
195+
_, ing := newDefaultTestStore(t)
196+
197+
userIDs, _ := pushTestSamples(t, ing, 10, 1000)
198+
199+
// Read samples back via ingester queries.
200+
for _, userID := range userIDs {
201+
ctx := user.InjectOrgID(context.Background(), userID)
202+
_, req, err := runTestQueryTimes(ctx, t, ing, labels.MatchRegexp, model.JobLabel, ".+", model.Latest.Add(-15*time.Second), model.Latest)
203+
require.NoError(t, err)
204+
205+
s := stream{
206+
ctx: ctx,
207+
}
208+
err = ing.QueryStream(req, &s)
209+
require.NoError(t, err)
210+
211+
// Nothing should be selected.
212+
require.Equal(t, 0, len(s.responses))
213+
}
214+
215+
// Read samples back via chunk store.
216+
ing.Shutdown()
217+
}
218+
190219
func TestIngesterIdleFlush(t *testing.T) {
191220
// Create test ingester with short flush cycle
192221
cfg := defaultIngesterTestConfig()

pkg/querier/ingester_streaming_queryable.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ func (i ingesterQueryable) Get(ctx context.Context, from, through model.Time, ma
5454

5555
chunks := make([]chunk.Chunk, 0, len(results))
5656
for _, result := range results {
57+
// Sometimes the ingester can send series that have no data.
58+
if len(result.Chunks) == 0 {
59+
continue
60+
}
61+
5762
metric := client.FromLabelAdaptersToMetric(result.Labels)
5863
cs, err := chunkcompat.FromChunks(userID, metric, result.Chunks)
5964
if err != nil {
@@ -89,6 +94,11 @@ func (q *ingesterStreamingQuerier) Select(sp *storage.SelectParams, matchers ...
8994

9095
serieses := make([]storage.Series, 0, len(results))
9196
for _, result := range results {
97+
// Sometimes the ingester can send series that have no data.
98+
if len(result.Chunks) == 0 {
99+
continue
100+
}
101+
92102
chunks, err := chunkcompat.FromChunks(userID, nil, result.Chunks)
93103
if err != nil {
94104
return nil, nil, promql.ErrStorage{Err: err}

pkg/querier/ingester_streaming_queryable_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,41 @@ import (
44
"context"
55
"testing"
66

7+
"github.com/prometheus/common/model"
78
"github.com/prometheus/prometheus/pkg/labels"
89
"github.com/stretchr/testify/require"
910

11+
"github.com/cortexproject/cortex/pkg/chunk"
12+
"github.com/cortexproject/cortex/pkg/chunk/encoding"
1013
"github.com/cortexproject/cortex/pkg/ingester/client"
14+
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
1115
"github.com/weaveworks/common/user"
1216
)
1317

1418
func TestIngesterStreaming(t *testing.T) {
19+
// We need to make sure that there is atleast one chunk present,
20+
// else no series will be selected.
21+
promChunk, err := encoding.NewForEncoding(encoding.Bigchunk)
22+
require.NoError(t, err)
23+
24+
clientChunks, err := chunkcompat.ToChunks([]chunk.Chunk{
25+
chunk.NewChunk("", 0, nil, promChunk, model.Earliest, model.Earliest),
26+
})
27+
require.NoError(t, err)
28+
1529
d := &mockDistributor{
1630
r: []client.TimeSeriesChunk{
1731
{
1832
Labels: []client.LabelAdapter{
1933
{Name: "bar", Value: "baz"},
2034
},
35+
Chunks: clientChunks,
2136
},
2237
{
2338
Labels: []client.LabelAdapter{
2439
{Name: "foo", Value: "bar"},
2540
},
41+
Chunks: clientChunks,
2642
},
2743
},
2844
}

0 commit comments

Comments
 (0)