Skip to content

Commit 33336a1

Browse files
rishabhmauryaRishabh Kumar Maurya
authored andcommitted
optimize num agg using quick select for topN when applicable
Signed-off-by: Rishabh Maurya <[email protected]> (cherry picked from commit 130d890)
1 parent 1c015d3 commit 33336a1

File tree

1 file changed

+56
-24
lines changed

1 file changed

+56
-24
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java

Lines changed: 56 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.lucene.index.LeafReaderContext;
3636
import org.apache.lucene.index.SortedNumericDocValues;
3737
import org.apache.lucene.search.ScoreMode;
38+
import org.apache.lucene.util.ArrayUtil;
3839
import org.apache.lucene.util.NumericUtils;
3940
import org.apache.lucene.util.PriorityQueue;
4041
import org.opensearch.common.Numbers;
@@ -183,37 +184,68 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
183184
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
184185

185186
int size = (int) Math.min(bucketsInOrd, localBucketCountThresholds.getRequiredSize());
186-
PriorityQueue<B> ordered = buildPriorityQueue(size);
187187
B spare = null;
188188
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
189189
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
190-
while (ordsEnum.next()) {
191-
long docCount = bucketDocCount(ordsEnum.ord());
192-
otherDocCounts[ordIdx] += docCount;
193-
if (docCount < localBucketCountThresholds.getMinDocCount()) {
194-
continue;
190+
191+
if ((bucketsInOrd > (size * 2L)) || isKeyOrder(order)) {
192+
// use heap sort
193+
PriorityQueue<B> ordered = buildPriorityQueue(size);
194+
while (ordsEnum.next()) {
195+
long docCount = bucketDocCount(ordsEnum.ord());
196+
otherDocCounts[ordIdx] += docCount;
197+
if (docCount < localBucketCountThresholds.getMinDocCount()) {
198+
continue;
199+
}
200+
if (spare == null) {
201+
spare = emptyBucketBuilder.get();
202+
}
203+
updateBucket(spare, ordsEnum, docCount);
204+
spare = ordered.insertWithOverflow(spare);
205+
}
206+
// Get the top buckets
207+
B[] bucketsForOrd = buildBuckets(ordered.size());
208+
topBucketsPerOrd[ordIdx] = bucketsForOrd;
209+
if (isKeyOrder(order)) {
210+
for (int b = ordered.size() - 1; b >= 0; --b) {
211+
topBucketsPerOrd[ordIdx][b] = ordered.pop();
212+
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
213+
}
214+
} else {
215+
// sorted buckets not needed as they will be sorted by key in buildResult() which is different from
216+
// order in priority queue ordered
217+
Iterator<B> itr = ordered.iterator();
218+
for (int b = ordered.size() - 1; b >= 0; --b) {
219+
topBucketsPerOrd[ordIdx][b] = itr.next();
220+
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
221+
}
195222
}
196-
if (spare == null) {
223+
} else {
224+
B[] bucketsForOrd = buildBuckets((int) bucketOrds.size());
225+
int tot = 0;
226+
while (ordsEnum.next()) {
227+
long docCount = bucketDocCount(ordsEnum.ord());
228+
otherDocCounts[ordIdx] += docCount;
229+
if (docCount < localBucketCountThresholds.getMinDocCount()) {
230+
continue;
231+
}
197232
spare = emptyBucketBuilder.get();
233+
updateBucket(spare, ordsEnum, docCount);
234+
bucketsForOrd[tot++] = spare;
198235
}
199-
updateBucket(spare, ordsEnum, docCount);
200-
spare = ordered.insertWithOverflow(spare);
201-
}
202-
203-
// Get the top buckets
204-
B[] bucketsForOrd = buildBuckets(ordered.size());
205-
topBucketsPerOrd[ordIdx] = bucketsForOrd;
206-
if (isKeyOrder(order)) {
207-
for (int b = ordered.size() - 1; b >= 0; --b) {
208-
topBucketsPerOrd[ordIdx][b] = ordered.pop();
209-
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
236+
if (tot > size & partiallyBuiltBucketComparator != null) {
237+
// quick select topN
238+
// TODO do we need to handle case for SignificantTerm Agg separately
239+
ArrayUtil.select(
240+
bucketsForOrd,
241+
0,
242+
tot,
243+
size,
244+
((b1, b2) -> partiallyBuiltBucketComparator.compare((InternalTerms.Bucket<?>) b1, (InternalTerms.Bucket<?>) b2))
245+
);
210246
}
211-
} else {
212-
// sorted buckets not needed as they will be sorted by key in buildResult() which is different from
213-
// order in priority queue ordered
214-
Iterator<B> itr = ordered.iterator();
215-
for (int b = ordered.size() - 1; b >= 0; --b) {
216-
topBucketsPerOrd[ordIdx][b] = itr.next();
247+
topBucketsPerOrd[ordIdx] = Arrays.copyOf(bucketsForOrd, size);
248+
for (int b = 0; b < size; b++) {
217249
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
218250
}
219251
}

0 commit comments

Comments
 (0)