Skip to content

Commit e14e0fd

Browse files
committed
Read ingesters and chunk store in parallel
1 parent 3f72a81 commit e14e0fd

File tree

1 file changed

+33
-15
lines changed

1 file changed

+33
-15
lines changed

querier/querier.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,28 +76,46 @@ type MergeQuerier struct {
7676
// QueryRange fetches series for a given time range and label matchers from multiple
7777
// promql.Queriers and returns the merged results as a map of series iterators.
7878
func (qm MergeQuerier) QueryRange(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
79-
fpToIt := map[model.Fingerprint]local.SeriesIterator{}
8079

81-
// Fetch samples from all queriers and group them by fingerprint (unsorted
82-
// and with overlap).
80+
// Fetch samples from all queriers in parallel
81+
matrices := make(chan model.Matrix)
82+
errors := make(chan error)
8383
for _, q := range qm.Queriers {
84-
matrix, err := q.Query(ctx, from, to, matchers...)
85-
if err != nil {
86-
return nil, err
87-
}
84+
go func(q Querier) {
85+
matrix, err := q.Query(ctx, from, to, matchers...)
86+
if err != nil {
87+
errors <- err
88+
} else {
89+
matrices <- matrix
90+
}
91+
}(q)
92+
}
8893

89-
for _, ss := range matrix {
90-
fp := ss.Metric.Fingerprint()
91-
if it, ok := fpToIt[fp]; !ok {
92-
fpToIt[fp] = sampleStreamIterator{
93-
ss: ss,
94+
// Group them by fingerprint (unsorted and with overlap).
95+
fpToIt := map[model.Fingerprint]local.SeriesIterator{}
96+
var lastErr error
97+
for i := 0; i < len(qm.Queriers); i++ {
98+
select {
99+
case err := <-errors:
100+
lastErr = err
101+
102+
case matrix := <-matrices:
103+
for _, ss := range matrix {
104+
fp := ss.Metric.Fingerprint()
105+
if it, ok := fpToIt[fp]; !ok {
106+
fpToIt[fp] = sampleStreamIterator{
107+
ss: ss,
108+
}
109+
} else {
110+
ssIt := it.(sampleStreamIterator)
111+
ssIt.ss.Values = util.MergeSamples(ssIt.ss.Values, ss.Values)
94112
}
95-
} else {
96-
ssIt := it.(sampleStreamIterator)
97-
ssIt.ss.Values = util.MergeSamples(ssIt.ss.Values, ss.Values)
98113
}
99114
}
100115
}
116+
if lastErr != nil {
117+
return nil, lastErr
118+
}
101119

102120
iterators := make([]local.SeriesIterator, 0, len(fpToIt))
103121
for _, it := range fpToIt {

0 commit comments

Comments
 (0)