Skip to content

Commit 596ddda

Browse files
committed
Tidy up merge iterator
1 parent 346b09f commit 596ddda

File tree

3 files changed

+16
-20
lines changed

3 files changed

+16
-20
lines changed

pkg/querier/querier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ func createMergeIterators(incomingIterators chan []local.SeriesIterator, incomin
269269

270270
var mergeIterators []local.SeriesIterator
271271
for _, its := range fpToIts {
272-
mergeIterators = append(mergeIterators, util.NewMergeSeriesIterator(its[0].Metric().Metric, its))
272+
mergeIterators = append(mergeIterators, util.NewMergeSeriesIterator(its))
273273
}
274274
return mergeIterators, nil
275275
}

pkg/util/iterator.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package util
22

33
import (
44
"sort"
5-
"time"
65

76
"github.com/prometheus/common/model"
87
"github.com/prometheus/prometheus/storage/local"
@@ -11,38 +10,31 @@ import (
1110

1211
// MergeSeriesIterator combines SampleStreamIterator
1312
type MergeSeriesIterator struct {
14-
metric model.Metric
1513
iterators []local.SeriesIterator
1614
}
1715

1816
// NewMergeSeriesIterator creates a mergeSeriesIterator
19-
func NewMergeSeriesIterator(metric model.Metric, iterators []local.SeriesIterator) MergeSeriesIterator {
17+
func NewMergeSeriesIterator(iterators []local.SeriesIterator) MergeSeriesIterator {
2018
return MergeSeriesIterator{
21-
metric: metric,
2219
iterators: iterators,
2320
}
2421
}
2522

2623
// Metric implements the SeriesIterator interface.
2724
func (msit MergeSeriesIterator) Metric() metric.Metric {
28-
return metric.Metric{Metric: msit.metric}
25+
return metric.Metric{Metric: msit.iterators[0].Metric().Metric}
2926
}
3027

3128
// ValueAtOrBeforeTime implements the SeriesIterator interface.
3229
func (msit MergeSeriesIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
33-
var closestSamplePair *model.SamplePair
34-
var closestTimeDifference time.Duration
35-
30+
latest := model.ZeroSamplePair
3631
for _, it := range msit.iterators {
37-
samplePair := it.ValueAtOrBeforeTime(ts)
38-
timeDifference := ts.Sub(samplePair.Timestamp)
39-
if closestSamplePair == nil || timeDifference.Nanoseconds() < closestTimeDifference.Nanoseconds() {
40-
closestSamplePair = &samplePair
41-
closestTimeDifference = timeDifference
32+
v := it.ValueAtOrBeforeTime(ts)
33+
if v.Timestamp.After(latest.Timestamp) {
34+
latest = v
4235
}
4336
}
44-
45-
return *closestSamplePair
37+
return latest
4638
}
4739

4840
// RangeValues implements the SeriesIterator interface.
@@ -60,7 +52,11 @@ func (msit MergeSeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
6052
}
6153

6254
// Close implements the SeriesIterator interface.
63-
func (msit MergeSeriesIterator) Close() {}
55+
func (msit MergeSeriesIterator) Close() {
56+
for _, it := range msit.iterators {
57+
it.Close()
58+
}
59+
}
6460

6561
// SampleStreamIterator is a struct and not just a renamed type because otherwise the Metric
6662
// field and Metric() methods would clash.

pkg/util/iterator_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestMergeSeriesIterator_Metric(t *testing.T) {
2121
Values: []model.SamplePair{},
2222
})
2323

24-
mergeIterator := NewMergeSeriesIterator(sampleMetric, []local.SeriesIterator{iterator1, iterator2})
24+
mergeIterator := NewMergeSeriesIterator([]local.SeriesIterator{iterator1, iterator2})
2525

2626
for _, c := range []struct {
2727
mergeIterator MergeSeriesIterator
@@ -54,7 +54,7 @@ func TestMergeSeriesIterator_ValueAtOrBeforeTime(t *testing.T) {
5454
Values: []model.SamplePair{sample2, sample4},
5555
})
5656

57-
mergeIterator := NewMergeSeriesIterator(sampleMetric, []local.SeriesIterator{iterator1, iterator2})
57+
mergeIterator := NewMergeSeriesIterator([]local.SeriesIterator{iterator1, iterator2})
5858

5959
for _, c := range []struct {
6060
mergeIterator MergeSeriesIterator
@@ -104,7 +104,7 @@ func TestMergeSeriesIterator_RangeValues(t *testing.T) {
104104
Values: []model.SamplePair{sample2, sample4},
105105
})
106106

107-
mergeIterator := NewMergeSeriesIterator(sampleMetric, []local.SeriesIterator{iterator1, iterator2})
107+
mergeIterator := NewMergeSeriesIterator([]local.SeriesIterator{iterator1, iterator2})
108108

109109
for _, c := range []struct {
110110
mergeIterator MergeSeriesIterator

0 commit comments

Comments
 (0)