Skip to content

Commit dd2758d

Browse files
committed
Merge branch 'chunked_fetch_phase' of github.com:drempapis/elasticsearch into chunked_fetch_phase
2 parents 222e42d + 03407fe commit dd2758d

File tree

3 files changed

+29
-41
lines changed

3 files changed

+29
-41
lines changed

server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,7 @@ public final IterateResult iterate(
224224
hit.decRef();
225225
}
226226

227-
lastChunk = new SearchHits(
228-
lastHitsArray,
229-
new TotalHits(lastHitsArray.length, TotalHits.Relation.EQUAL_TO),
230-
Float.NaN
231-
);
227+
lastChunk = new SearchHits(lastHitsArray, new TotalHits(lastHitsArray.length, TotalHits.Relation.EQUAL_TO), Float.NaN);
232228
chunkBuffer.clear();
233229
}
234230
} catch (SearchTimeoutException e) {
@@ -279,11 +275,7 @@ private static CompletableFuture<Void> sendChunk(
279275

280276
SearchHits chunkHits = null;
281277
try {
282-
chunkHits = new SearchHits(
283-
hitsArray,
284-
new TotalHits(hitsArray.length, TotalHits.Relation.EQUAL_TO),
285-
maxScore
286-
);
278+
chunkHits = new SearchHits(hitsArray, new TotalHits(hitsArray.length, TotalHits.Relation.EQUAL_TO), maxScore);
287279
final SearchHits finalChunkHits = chunkHits;
288280

289281
FetchPhaseResponseChunk chunk = new FetchPhaseResponseChunk(
@@ -298,18 +290,15 @@ private static CompletableFuture<Void> sendChunk(
298290
);
299291

300292
// Send the chunk - coordinator will take ownership of the hits
301-
writer.writeResponseChunk(chunk, ActionListener.wrap(
302-
ack -> {
303-
// Coordinator now owns the hits, decRef to release local reference
304-
finalChunkHits.decRef();
305-
future.complete(null);
306-
},
307-
ex -> {
308-
// Failed to send - we still own the hits, must clean up
309-
finalChunkHits.decRef();
310-
future.completeExceptionally(ex);
311-
}
312-
));
293+
writer.writeResponseChunk(chunk, ActionListener.wrap(ack -> {
294+
// Coordinator now owns the hits, decRef to release local reference
295+
finalChunkHits.decRef();
296+
future.complete(null);
297+
}, ex -> {
298+
// Failed to send - we still own the hits, must clean up
299+
finalChunkHits.decRef();
300+
future.completeExceptionally(ex);
301+
}));
313302
} catch (Exception e) {
314303
future.completeExceptionally(e);
315304
// If chunk creation failed after SearchHits was created, clean up

server/src/main/java/org/elasticsearch/search/fetch/chunk/FetchPhaseResponseStream.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,13 @@ void writeChunk(FetchPhaseResponseChunk chunk, Releasable releasable) {
103103

104104
if (logger.isTraceEnabled()) {
105105
logger.info(
106-
"Received chunk [{}] docs for shard [{}]: [{}/{}] hits accumulated, [{}] breaker bytes, used breaker bytes [{}]",
107-
chunk.hits() == null ? 0 : chunk.hits().getHits().length,
108-
shardIndex,
109-
queue.size(),
110-
expectedDocs,
111-
totalBreakerBytes.get(),
112-
circuitBreaker.getUsed()
106+
"Received chunk [{}] docs for shard [{}]: [{}/{}] hits accumulated, [{}] breaker bytes, used breaker bytes [{}]",
107+
chunk.hits() == null ? 0 : chunk.hits().getHits().length,
108+
shardIndex,
109+
queue.size(),
110+
expectedDocs,
111+
totalBreakerBytes.get(),
112+
circuitBreaker.getUsed()
113113
);
114114
}
115115
success = true;
@@ -159,9 +159,9 @@ FetchSearchResult buildFinalResult(ShardSearchContextId ctxId, SearchShardTarget
159159
ownershipTransferred = true;
160160

161161
SearchHits searchHits = new SearchHits(
162-
orderedHits.toArray(SearchHit[]::new),
163-
new TotalHits(orderedHits.size(), TotalHits.Relation.EQUAL_TO),
164-
maxScore
162+
orderedHits.toArray(SearchHit[]::new),
163+
new TotalHits(orderedHits.size(), TotalHits.Relation.EQUAL_TO),
164+
maxScore
165165
);
166166

167167
FetchSearchResult result = new FetchSearchResult(ctxId, shardTarget);
@@ -201,10 +201,10 @@ int getCurrentQueueSize() {
201201
protected void closeInternal() {
202202
if (logger.isTraceEnabled()) {
203203
logger.info(
204-
"Closing response stream for shard [{}], releasing [{}] hits, [{}] breaker bytes",
205-
shardIndex,
206-
queue.size(),
207-
totalBreakerBytes.get()
204+
"Closing response stream for shard [{}], releasing [{}] hits, [{}] breaker bytes",
205+
shardIndex,
206+
queue.size(),
207+
totalBreakerBytes.get()
208208
);
209209
}
210210

@@ -220,10 +220,10 @@ protected void closeInternal() {
220220
circuitBreaker.addWithoutBreaking(-totalBreakerBytes.get());
221221
if (logger.isTraceEnabled()) {
222222
logger.info(
223-
"Released [{}] breaker bytes for shard [{}], used breaker bytes [{}]",
224-
totalBreakerBytes.get(),
225-
shardIndex,
226-
circuitBreaker.getUsed()
223+
"Released [{}] breaker bytes for shard [{}], used breaker bytes [{}]",
224+
totalBreakerBytes.get(),
225+
shardIndex,
226+
circuitBreaker.getUsed()
227227
);
228228
}
229229
totalBreakerBytes.set(0);

server/src/test/java/org/elasticsearch/action/search/KnnSearchSingleNodeTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.action.search;
1111

12-
1312
import org.elasticsearch.action.support.WriteRequest;
1413
import org.elasticsearch.cluster.metadata.IndexMetadata;
1514
import org.elasticsearch.common.settings.Settings;

0 commit comments

Comments
 (0)