Skip to content

Commit be63a81

Browse files
authored
Merge pull request #1345 from grafana/lazy-load-chunks
Add ability to lazy load chunks
2 parents 826f004 + 5553e5f commit be63a81

File tree

5 files changed

+94
-33
lines changed

5 files changed

+94
-33
lines changed

pkg/chunk/chunk_store.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package chunk
22

33
import (
44
"context"
5+
"errors"
56
"flag"
67
"fmt"
78
"net/http"
@@ -183,6 +184,10 @@ func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers .
183184
return c.getMetricNameChunks(ctx, from, through, matchers, metricName)
184185
}
185186

187+
func (c *store) GetChunkRefs(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
188+
return nil, nil, errors.New("not implemented")
189+
}
190+
186191
// LabelValuesForMetricName retrieves all label values for a single label name and metric name.
187192
func (c *store) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName, labelName string) ([]string, error) {
188193
log, ctx := spanlogger.New(ctx, "ChunkStore.LabelValues")
@@ -304,7 +309,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim
304309
level.Debug(log).Log("Chunks in index", len(chunks))
305310

306311
// Filter out chunks that are not in the selected time range.
307-
filtered, keys := filterChunksByTime(from, through, chunks)
312+
filtered := filterChunksByTime(from, through, chunks)
308313
level.Debug(log).Log("Chunks post filtering", len(chunks))
309314

310315
maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
@@ -315,6 +320,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim
315320
}
316321

317322
// Now fetch the actual chunk data from Memcache / S3
323+
keys := keysFromChunks(filtered)
318324
allChunks, err := c.FetchChunks(ctx, filtered, keys)
319325
if err != nil {
320326
return nil, promql.ErrStorage{Err: err}

pkg/chunk/chunk_store_utils.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,24 @@ import (
1616

1717
const chunkDecodeParallelism = 16
1818

19-
func filterChunksByTime(from, through model.Time, chunks []Chunk) ([]Chunk, []string) {
19+
func filterChunksByTime(from, through model.Time, chunks []Chunk) []Chunk {
2020
filtered := make([]Chunk, 0, len(chunks))
21-
keys := make([]string, 0, len(chunks))
2221
for _, chunk := range chunks {
2322
if chunk.Through < from || through < chunk.From {
2423
continue
2524
}
2625
filtered = append(filtered, chunk)
27-
keys = append(keys, chunk.ExternalKey())
2826
}
29-
return filtered, keys
27+
return filtered
28+
}
29+
30+
func keysFromChunks(chunks []Chunk) []string {
31+
keys := make([]string, 0, len(chunks))
32+
for _, chk := range chunks {
33+
keys = append(keys, chk.ExternalKey())
34+
}
35+
36+
return keys
3037
}
3138

3239
func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk {

pkg/chunk/composite_store.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
type Store interface {
1515
Put(ctx context.Context, chunks []Chunk) error
1616
PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error
17-
Get(tx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
17+
Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
18+
// GetChunkRefs returns the un-loaded chunks and the fetchers to be used to load them. You can load each slice of chunks ([]Chunk),
19+
// using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...)
20+
GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
1821
LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error)
1922
Stop()
2023
}
@@ -103,6 +106,22 @@ func (c compositeStore) LabelValuesForMetricName(ctx context.Context, from, thro
103106
return result, err
104107
}
105108

109+
func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
110+
chunkIDs := [][]Chunk{}
111+
fetchers := []*Fetcher{}
112+
err := c.forStores(from, through, func(from, through model.Time, store Store) error {
113+
ids, fetcher, err := store.GetChunkRefs(ctx, from, through, matchers...)
114+
if err != nil {
115+
return err
116+
}
117+
118+
chunkIDs = append(chunkIDs, ids...)
119+
fetchers = append(fetchers, fetcher...)
120+
return nil
121+
})
122+
return chunkIDs, fetchers, err
123+
}
124+
106125
func (c compositeStore) Stop() {
107126
for _, store := range c.stores {
108127
store.Stop()

pkg/chunk/composite_store_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ func (m mockStore) LabelValuesForMetricName(ctx context.Context, from, through m
2828
return nil, nil
2929
}
3030

31+
func (m mockStore) GetChunkRefs(tx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
32+
return nil, nil, nil
33+
}
34+
3135
func (m mockStore) Stop() {}
3236

3337
func TestCompositeStore(t *testing.T) {

pkg/chunk/series_store.go

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,54 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc
111111
return nil, err
112112
}
113113

114+
chks, fetchers, err := c.GetChunkRefs(ctx, from, through, allMatchers...)
115+
if err != nil {
116+
return nil, err
117+
}
118+
119+
if len(chks) == 0 {
120+
// Shortcut
121+
return nil, nil
122+
}
123+
124+
chunks := chks[0]
125+
fetcher := fetchers[0]
126+
// Protect ourselves against OOMing.
127+
maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
128+
if maxChunksPerQuery > 0 && len(chunks) > maxChunksPerQuery {
129+
err := httpgrpc.Errorf(http.StatusBadRequest, "Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunks), maxChunksPerQuery)
130+
level.Error(log).Log("err", err)
131+
return nil, err
132+
}
133+
134+
// Now fetch the actual chunk data from Memcache / S3
135+
keys := keysFromChunks(chunks)
136+
allChunks, err := fetcher.FetchChunks(ctx, chunks, keys)
137+
if err != nil {
138+
level.Error(log).Log("msg", "FetchChunks", "err", err)
139+
return nil, err
140+
}
141+
142+
// Filter out chunks based on the empty matchers in the query.
143+
filteredChunks := filterChunksByMatchers(allChunks, allMatchers)
144+
return filteredChunks, nil
145+
}
146+
147+
func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
148+
log, ctx := spanlogger.New(ctx, "SeriesStore.GetChunkRefs")
149+
defer log.Span.Finish()
150+
151+
userID, err := user.ExtractOrgID(ctx)
152+
if err != nil {
153+
return nil, nil, err
154+
}
155+
114156
// Validate the query is within reasonable bounds.
115157
metricName, matchers, shortcut, err := c.validateQuery(ctx, from, &through, allMatchers)
116158
if err != nil {
117-
return nil, err
159+
return nil, nil, err
118160
} else if shortcut {
119-
return nil, nil
161+
return nil, nil, nil
120162
}
121163

122164
level.Debug(log).Log("metric", metricName)
@@ -126,46 +168,29 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc
126168
_, matchers = util.SplitFiltersAndMatchers(matchers)
127169
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, metricName, matchers)
128170
if err != nil {
129-
return nil, err
171+
return nil, nil, err
130172
}
131173
level.Debug(log).Log("series-ids", len(seriesIDs))
132174

133175
// Lookup the series in the index to get the chunks.
134176
chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, seriesIDs)
135177
if err != nil {
136178
level.Error(log).Log("msg", "lookupChunksBySeries", "err", err)
137-
return nil, err
179+
return nil, nil, err
138180
}
139181
level.Debug(log).Log("chunk-ids", len(chunkIDs))
140182

141183
chunks, err := c.convertChunkIDsToChunks(ctx, userID, chunkIDs)
142184
if err != nil {
143-
level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err)
144-
return nil, err
145-
}
146-
// Filter out chunks that are not in the selected time range.
147-
filtered, keys := filterChunksByTime(from, through, chunks)
148-
level.Debug(log).Log("chunks-post-filtering", len(chunks))
149-
chunksPerQuery.Observe(float64(len(filtered)))
150-
151-
// Protect ourselves against OOMing.
152-
maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
153-
if maxChunksPerQuery > 0 && len(chunkIDs) > maxChunksPerQuery {
154-
err := httpgrpc.Errorf(http.StatusBadRequest, "Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunkIDs), maxChunksPerQuery)
155-
level.Error(log).Log("err", err)
156-
return nil, err
185+
level.Error(log).Log("op", "convertChunkIDsToChunks", "err", err)
186+
return nil, nil, err
157187
}
158188

159-
// Now fetch the actual chunk data from Memcache / S3
160-
allChunks, err := c.FetchChunks(ctx, filtered, keys)
161-
if err != nil {
162-
level.Error(log).Log("msg", "FetchChunks", "err", err)
163-
return nil, err
164-
}
189+
chunks = filterChunksByTime(from, through, chunks)
190+
level.Debug(log).Log("chunks-post-filtering", len(chunks))
191+
chunksPerQuery.Observe(float64(len(chunks)))
165192

166-
// Filter out chunks based on the empty matchers in the query.
167-
filteredChunks := filterChunksByMatchers(allChunks, allMatchers)
168-
return filteredChunks, nil
193+
return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil
169194
}
170195

171196
func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, metricName string, matchers []*labels.Matcher) ([]string, error) {

0 commit comments

Comments
 (0)