Skip to content

Commit a01083d

Browse files
committed
Add seq numbering in chungs and reording on the coordination node per shard
1 parent 1c21e27 commit a01083d

File tree

7 files changed

+229
-88
lines changed

7 files changed

+229
-88
lines changed

server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ protected SearchHit nextDoc(int doc) throws IOException {
299299
leafIdLoader,
300300
rankDocs == null ? null : rankDocs.get(doc)
301301
);
302+
302303
boolean success = false;
303304
try {
304305
sourceProvider.source = hit.source();
@@ -362,6 +363,12 @@ protected SearchHit nextDoc(int doc) throws IOException {
362363
hit.decRef();
363364
}
364365
}
366+
367+
// Store sequence info in the context result for coordinator
368+
if (result.lastChunk != null && result.lastChunkSequenceStart >= 0) {
369+
context.fetchResult().setLastChunkSequenceStart(result.lastChunkSequenceStart);
370+
}
371+
365372
// Return last chunk or empty
366373
if (result.lastChunk != null) {
367374
return result.lastChunk;

server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java

Lines changed: 90 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,21 @@
2929
import java.util.Arrays;
3030
import java.util.List;
3131
import java.util.concurrent.CompletableFuture;
32-
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.concurrent.atomic.AtomicLong;
3333

3434
/**
35-
* Given a set of doc ids and an index reader, sorts the docs by id, splits the sorted
36-
* docs by leaf reader, and iterates through them calling abstract methods
35+
* Given a set of doc ids and an index reader, sorts the docs by id (when not streaming),
36+
* splits the sorted docs by leaf reader, and iterates through them calling abstract methods
3737
* {@link #setNextReader(LeafReaderContext, int[])} for each new leaf reader and
3838
* {@link #nextDoc(int)} for each document; then collects the resulting {@link SearchHit}s
3939
* into an array and returns them in the order of the original doc ids.
4040
* <p>
4141
* Optionally supports streaming hits in chunks if a {@link FetchPhaseResponseChunk.Writer}
4242
* is provided, reducing memory footprint for large result sets.
43+
* <p>
44+
* ORDERING: When streaming is disabled, docs are sorted by doc ID for efficient index access,
45+
* but the original score-based order is restored via index mapping. When streaming is enabled,
46+
* docs are NOT sorted to preserve score order, and sequence numbers track ordering across chunks.
4347
*/
4448
abstract class FetchPhaseDocsIterator {
4549

@@ -49,6 +53,13 @@ abstract class FetchPhaseDocsIterator {
4953
*/
5054
private long requestBreakerBytes;
5155

56+
/**
57+
* Sequence counter for tracking hit order in streaming mode.
58+
* Each hit gets a unique sequence number allowing the coordinator to restore correct order
59+
* even if chunks arrive out of order.
60+
*/
61+
private final AtomicLong hitSequenceCounter = new AtomicLong(0);
62+
5263
public void addRequestBreakerBytes(long delta) {
5364
requestBreakerBytes += delta;
5465
}
@@ -57,8 +68,6 @@ public long getRequestBreakerBytes() {
5768
return requestBreakerBytes;
5869
}
5970

60-
private static AtomicInteger counter = new AtomicInteger(0);
61-
6271
/**
6372
* Called when a new leaf reader is reached
6473
* @param ctx the leaf reader for this set of doc ids
@@ -75,26 +84,13 @@ public long getRequestBreakerBytes() {
7584

7685
/**
7786
* Iterate over a set of docsIds within a particular shard and index reader.
78-
*/
79-
/* public final SearchHit[] iterate(
80-
SearchShardTarget shardTarget,
81-
IndexReader indexReader,
82-
int[] docIds,
83-
boolean allowPartialResults,
84-
QuerySearchResult querySearchResult
85-
) {
86-
// Delegate to new method with null writer to maintain backward compatibility
87-
// When writer is null, no streaming chunks are sent (original behavior)
88-
return iterate(shardTarget, indexReader, docIds, allowPartialResults, querySearchResult, null, 0, null);
89-
}*/
90-
91-
/**
92-
* Iterate over a set of docsIds within a particular shard and index reader.
93-
* If a writer is provided, hits are sent in chunks as they are produced (streaming mode).
87+
*
9488
* Streaming mode: When {@code chunkWriter} is non-null, hits are buffered and sent
95-
* in chunks of size {@code chunkSize}. This reduces memory footprint for large result sets
96-
* by streaming results to the coordinator as they are produced.
97-
* Legacy mode: When {@code chunkWriter} is null, behaves exactly like the original
89+
* in chunks. Docs are kept in original order (score-based) and sequence numbers track
90+
* position to handle out-of-order chunk arrival.
91+
*
92+
* Non-streaming mode: Docs are sorted by doc ID for efficiency, and original order
93+
* is restored via index mapping.
9894
*
9995
* @param shardTarget the shard being fetched from
10096
* @param indexReader the index reader
@@ -103,7 +99,8 @@ public long getRequestBreakerBytes() {
10399
* @param querySearchResult the query result
104100
* @param chunkWriter if non-null, enables streaming mode and sends hits in chunks
105101
* @param chunkSize number of hits per chunk (only used if chunkWriter is non-null)
106-
* @return array of SearchHits in the order of the original docIds
102+
* @param pendingChunks list to track pending chunk acknowledgments
103+
* @return IterateResult containing hits array and optional last chunk with sequence info
107104
*/
108105
public final IterateResult iterate(
109106
SearchShardTarget shardTarget,
@@ -123,24 +120,37 @@ public final IterateResult iterate(
123120
ShardId shardId = streamingEnabled ? shardTarget.getShardId() : null;
124121
SearchHits lastChunk = null;
125122

123+
// Track sequence numbers for ordering
124+
long currentChunkSequenceStart = -1;
125+
long lastChunkSequenceStart = -1;
126+
126127
for (int index = 0; index < docIds.length; index++) {
127128
docs[index] = new DocIdToIndex(docIds[index], index);
128129
}
129-
// make sure that we iterate in doc id order
130-
Arrays.sort(docs);
130+
131+
// Only sort by doc ID if NOT streaming
132+
// Sorting by doc ID is an optimization for sequential index access,
133+
// but streaming mode needs to preserve score order from query phase
134+
if (streamingEnabled == false) {
135+
Arrays.sort(docs);
136+
}
137+
131138
int currentDoc = docs[0].docId;
139+
132140
try {
133141
int leafOrd = ReaderUtil.subIndex(docs[0].docId, indexReader.leaves());
134142
LeafReaderContext ctx = indexReader.leaves().get(leafOrd);
135143
int endReaderIdx = endReaderIdx(ctx, 0, docs);
136144
int[] docsInLeaf = docIdsInLeaf(0, endReaderIdx, docs, ctx.docBase);
145+
137146
try {
138147
setNextReader(ctx, docsInLeaf);
139148
} catch (ContextIndexSearcher.TimeExceededException e) {
140149
SearchTimeoutException.handleTimeout(allowPartialResults, shardTarget, querySearchResult);
141150
assert allowPartialResults;
142-
return new IterateResult(SearchHits.EMPTY, lastChunk);
151+
return new IterateResult(SearchHits.EMPTY, lastChunk, lastChunkSequenceStart);
143152
}
153+
144154
for (int i = 0; i < docs.length; i++) {
145155
try {
146156
if (i >= endReaderIdx) {
@@ -150,25 +160,36 @@ public final IterateResult iterate(
150160
docsInLeaf = docIdsInLeaf(i, endReaderIdx, docs, ctx.docBase);
151161
setNextReader(ctx, docsInLeaf);
152162
}
163+
153164
currentDoc = docs[i].docId;
154165
assert searchHits[docs[i].index] == null;
155166
SearchHit hit = nextDoc(docs[i].docId);
156167

157168
if (streamingEnabled) {
158169
hit.incRef();
170+
171+
// Mark sequence start when starting new chunk
172+
if (chunkBuffer.isEmpty()) {
173+
currentChunkSequenceStart = hitSequenceCounter.get();
174+
}
175+
176+
// Assign sequence to this hit and increment counter
177+
hitSequenceCounter.getAndIncrement();
178+
159179
chunkBuffer.add(hit);
160180

161-
// Send intermediate chunks -not when it's the last iteration
181+
// Send intermediate chunks - not when it's the last iteration
162182
if (chunkBuffer.size() >= chunkSize && i < docs.length - 1) {
163-
// Send HIT chunk
183+
// Send chunk with sequence information
164184
pendingChunks.add(
165185
sendChunk(
166186
chunkWriter,
167187
chunkBuffer,
168188
shardId,
189+
currentChunkSequenceStart, // Pass sequence start for ordering
169190
i - chunkBuffer.size() + 1,
170191
docIds.length,
171-
Float.NaN // maxScore not meaningful for individual chunks
192+
Float.NaN
172193
)
173194
);
174195
chunkBuffer.clear();
@@ -187,20 +208,27 @@ public final IterateResult iterate(
187208
assert allowPartialResults;
188209
SearchHit[] partialSearchHits = new SearchHit[i];
189210
System.arraycopy(searchHits, 0, partialSearchHits, 0, i);
190-
return new IterateResult(partialSearchHits, lastChunk);
211+
return new IterateResult(partialSearchHits, lastChunk, lastChunkSequenceStart);
191212
}
192213
}
193214

194215
// Return the final partial chunk if streaming is enabled and buffer has remaining hits
195216
if (streamingEnabled && chunkBuffer.isEmpty() == false) {
217+
// Remember the sequence start for the last chunk
218+
lastChunkSequenceStart = currentChunkSequenceStart;
219+
196220
SearchHit[] lastHitsArray = chunkBuffer.toArray(new SearchHit[0]);
197221

198222
// DecRef for SearchHits constructor (will increment)
199223
for (SearchHit hit : lastHitsArray) {
200224
hit.decRef();
201225
}
202226

203-
lastChunk = new SearchHits(lastHitsArray, new TotalHits(lastHitsArray.length, TotalHits.Relation.EQUAL_TO), Float.NaN);
227+
lastChunk = new SearchHits(
228+
lastHitsArray,
229+
new TotalHits(lastHitsArray.length, TotalHits.Relation.EQUAL_TO),
230+
Float.NaN
231+
);
204232
chunkBuffer.clear();
205233
}
206234
} catch (SearchTimeoutException e) {
@@ -218,16 +246,18 @@ public final IterateResult iterate(
218246
}
219247
throw new FetchPhaseExecutionException(shardTarget, "Error running fetch phase for doc [" + currentDoc + "]", e);
220248
}
221-
return new IterateResult(searchHits, lastChunk);
249+
250+
return new IterateResult(searchHits, lastChunk, lastChunkSequenceStart);
222251
}
223252

224253
/**
225-
* Sends a chunk of hits to the coordinator.
254+
* Sends a chunk of hits to the coordinator with sequence information for ordering.
226255
*/
227256
private static CompletableFuture<Void> sendChunk(
228257
FetchPhaseResponseChunk.Writer writer,
229258
List<SearchHit> buffer,
230259
ShardId shardId,
260+
long sequenceStart,
231261
int fromIndex,
232262
int totalDocs,
233263
float maxScore
@@ -249,30 +279,37 @@ private static CompletableFuture<Void> sendChunk(
249279

250280
SearchHits chunkHits = null;
251281
try {
252-
chunkHits = new SearchHits(hitsArray, new TotalHits(hitsArray.length, TotalHits.Relation.EQUAL_TO), maxScore);
282+
chunkHits = new SearchHits(
283+
hitsArray,
284+
new TotalHits(hitsArray.length, TotalHits.Relation.EQUAL_TO),
285+
maxScore
286+
);
253287
final SearchHits finalChunkHits = chunkHits;
254288

255289
FetchPhaseResponseChunk chunk = new FetchPhaseResponseChunk(
256-
counter.get(),
290+
System.currentTimeMillis(),
257291
FetchPhaseResponseChunk.Type.HITS,
258292
shardId,
259293
chunkHits,
260294
fromIndex,
261295
hitsArray.length,
262-
totalDocs
296+
totalDocs,
297+
sequenceStart // Include sequence start in chunk metadata
263298
);
264-
counter.incrementAndGet();
265299

266300
// Send the chunk - coordinator will take ownership of the hits
267-
writer.writeResponseChunk(chunk, ActionListener.wrap(ack -> {
268-
// Coordinator now owns the hits, decRef to release local reference
269-
finalChunkHits.decRef();
270-
future.complete(null);
271-
}, ex -> {
272-
// Failed to send - we still own the hits, must clean up
273-
finalChunkHits.decRef();
274-
future.completeExceptionally(ex);
275-
}));
301+
writer.writeResponseChunk(chunk, ActionListener.wrap(
302+
ack -> {
303+
// Coordinator now owns the hits, decRef to release local reference
304+
finalChunkHits.decRef();
305+
future.complete(null);
306+
},
307+
ex -> {
308+
// Failed to send - we still own the hits, must clean up
309+
finalChunkHits.decRef();
310+
future.completeExceptionally(ex);
311+
}
312+
));
276313
} catch (Exception e) {
277314
future.completeExceptionally(e);
278315
// If chunk creation failed after SearchHits was created, clean up
@@ -342,16 +379,18 @@ public int compareTo(DocIdToIndex o) {
342379
}
343380

344381
/**
345-
* Add result class to carry both hits array and last chunk for streaming version
382+
* Result class that carries hits array, last chunk, and sequence information.
383+
* The lastChunkSequenceStart is used by the coordinator to properly order the last chunk's hits.
346384
*/
347385
static class IterateResult {
348-
349386
final SearchHit[] hits;
350387
final SearchHits lastChunk; // null for non-streaming mode
388+
final long lastChunkSequenceStart; // -1 if no last chunk
351389

352-
IterateResult(SearchHit[] hits, SearchHits lastChunk) {
390+
IterateResult(SearchHit[] hits, SearchHits lastChunk, long lastChunkSequenceStart) {
353391
this.hits = hits;
354392
this.lastChunk = lastChunk;
393+
this.lastChunkSequenceStart = lastChunkSequenceStart;
355394
}
356395
}
357396
}

server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ public final class FetchSearchResult extends SearchPhaseResult {
3131

3232
private ProfileResult profileResult;
3333

34+
/**
35+
* Sequence number of the first hit in the last chunk (embedded in this result).
36+
* Used by the coordinator to maintain correct ordering when processing the last chunk.
37+
* Value of -1 indicates no last chunk or sequence tracking not applicable.
38+
*/
39+
private long lastChunkSequenceStart = -1;
40+
3441
private final RefCounted refCounted = LeakTracker.wrap(new SimpleRefCounted());
3542

3643
public FetchSearchResult() {}
@@ -44,6 +51,7 @@ public FetchSearchResult(StreamInput in) throws IOException {
4451
contextId = new ShardSearchContextId(in);
4552
hits = SearchHits.readFrom(in, true);
4653
profileResult = in.readOptionalWriteable(ProfileResult::new);
54+
lastChunkSequenceStart = in.readLong();
4755
}
4856

4957
@Override
@@ -52,6 +60,7 @@ public void writeTo(StreamOutput out) throws IOException {
5260
contextId.writeTo(out);
5361
hits.writeTo(out);
5462
out.writeOptionalWriteable(profileResult);
63+
out.writeLong(lastChunkSequenceStart);
5564
}
5665

5766
@Override
@@ -126,4 +135,24 @@ private void deallocate() {
126135
public boolean hasReferences() {
127136
return refCounted.hasReferences();
128137
}
138+
139+
/**
140+
* Sets the sequence start for the last chunk embedded in this result.
141+
* Called on the data node after iterating fetch phase results.
142+
*
143+
* @param sequenceStart the sequence number of the first hit in the last chunk
144+
*/
145+
public void setLastChunkSequenceStart(long sequenceStart) {
146+
this.lastChunkSequenceStart = sequenceStart;
147+
}
148+
149+
/**
150+
* Gets the sequence start for the last chunk embedded in this result.
151+
* Used by the coordinator to properly order last chunk hits with other chunks.
152+
*
153+
* @return the sequence number of the first hit in the last chunk, or -1 if not set
154+
*/
155+
public long getLastChunkSequenceStart() {
156+
return lastChunkSequenceStart;
157+
}
129158
}

0 commit comments

Comments
 (0)