Skip to content

Commit 61a4b4b

Browse files
committed
add failure handling and maxInFlighChunk mechanism
1 parent dd2758d commit 61a4b4b

File tree

3 files changed

+63
-12
lines changed

3 files changed

+63
-12
lines changed

server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@
4545

4646
import java.io.IOException;
4747
import java.io.UncheckedIOException;
48+
import java.util.ArrayDeque;
4849
import java.util.ArrayList;
4950
import java.util.Collections;
5051
import java.util.List;
5152
import java.util.Map;
5253
import java.util.concurrent.CompletableFuture;
54+
import java.util.concurrent.atomic.AtomicReference;
5355
import java.util.function.IntConsumer;
5456
import java.util.function.Supplier;
5557

@@ -97,8 +99,18 @@ public void execute(
9799
SearchHits hits = null;
98100
try {
99101
// Collect all pending chunk futures
100-
final List<CompletableFuture<Void>> pendingChunks = new ArrayList<>();
101-
hits = buildSearchHits(context, docIdsToLoad, profiler, rankDocs, memoryChecker, writer, pendingChunks);
102+
final int maxInFlightChunks = 1; // TODO make configurable
103+
final ArrayDeque<CompletableFuture<Void>> pendingChunks = new ArrayDeque<>();
104+
final AtomicReference<Throwable> sendFailure = new AtomicReference<>();
105+
hits = buildSearchHits(context,
106+
docIdsToLoad,
107+
profiler,
108+
rankDocs,
109+
memoryChecker,
110+
writer,
111+
pendingChunks,
112+
maxInFlightChunks,
113+
sendFailure);
102114

103115
// Wait for all chunks to be ACKed before setting final result
104116
if (writer != null && pendingChunks.isEmpty() == false) {
@@ -152,7 +164,7 @@ public void execute(SearchContext context, int[] docIdsToLoad, RankDocShardInfo
152164
: Profilers.startProfilingFetchPhase();
153165
SearchHits hits = null;
154166
try {
155-
hits = buildSearchHits(context, docIdsToLoad, profiler, rankDocs, memoryChecker, null, null);
167+
hits = buildSearchHits(context, docIdsToLoad, profiler, rankDocs, memoryChecker, null, null, 0, null);
156168
} finally {
157169
try {
158170
// Always finish profiling
@@ -187,7 +199,10 @@ private SearchHits buildSearchHits(
187199
RankDocShardInfo rankDocs,
188200
IntConsumer memoryChecker,
189201
FetchPhaseResponseChunk.Writer writer,
190-
List<CompletableFuture<Void>> pendingChunks
202+
ArrayDeque<CompletableFuture<Void>> pendingChunks,
203+
int maxInFlightChunks,
204+
AtomicReference<Throwable> sendFailure
205+
191206
) {
192207
var lookup = context.getSearchExecutionContext().getMappingLookup();
193208

@@ -333,7 +348,9 @@ protected SearchHit nextDoc(int doc) throws IOException {
333348
context.queryResult(),
334349
writer,
335350
5, // TODO set a proper number
336-
pendingChunks
351+
pendingChunks,
352+
maxInFlightChunks,
353+
sendFailure
337354
);
338355

339356
if (context.isCancelled()) {

server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import org.elasticsearch.search.query.SearchTimeoutException;
2626

2727
import java.io.IOException;
28+
import java.util.ArrayDeque;
2829
import java.util.ArrayList;
2930
import java.util.Arrays;
3031
import java.util.List;
3132
import java.util.concurrent.CompletableFuture;
3233
import java.util.concurrent.atomic.AtomicLong;
34+
import java.util.concurrent.atomic.AtomicReference;
3335

3436
/**
3537
* Given a set of doc ids and an index reader, sorts the docs by id (when not streaming),
@@ -110,7 +112,9 @@ public final IterateResult iterate(
110112
QuerySearchResult querySearchResult,
111113
FetchPhaseResponseChunk.Writer chunkWriter,
112114
int chunkSize,
113-
List<CompletableFuture<Void>> pendingChunks
115+
ArrayDeque<CompletableFuture<Void>> pendingChunks,
116+
int maxInFlightChunks,
117+
AtomicReference<Throwable> sendFailure
114118
) {
115119
SearchHit[] searchHits = new SearchHit[docIds.length];
116120
DocIdToIndex[] docs = new DocIdToIndex[docIds.length];
@@ -172,26 +176,44 @@ public final IterateResult iterate(
172176
if (chunkBuffer.isEmpty()) {
173177
currentChunkSequenceStart = hitSequenceCounter.get();
174178
}
175-
176179
// Assign sequence to this hit and increment counter
177180
hitSequenceCounter.getAndIncrement();
178181

179182
chunkBuffer.add(hit);
180183

181184
// Send intermediate chunks - not when it's the last iteration
182185
if (chunkBuffer.size() >= chunkSize && i < docs.length - 1) {
186+
// fail fast if any earlier send already failed
187+
Throwable knownFailure = sendFailure.get();
188+
if (knownFailure != null) {
189+
throw new RuntimeException("Fetch chunk failed", knownFailure);
190+
}
191+
183192
// Send chunk with sequence information
184-
pendingChunks.add(
185-
sendChunk(
193+
CompletableFuture<Void> chunkFuture = sendChunk(
186194
chunkWriter,
187195
chunkBuffer,
188196
shardId,
189197
currentChunkSequenceStart,
190198
i - chunkBuffer.size() + 1,
191199
docIds.length,
192200
Float.NaN
193-
)
194-
);
201+
);
202+
203+
// record failures as soon as they happen
204+
chunkFuture.whenComplete((ok, ex) -> {
205+
if (ex != null) {
206+
sendFailure.compareAndSet(null, ex);
207+
}
208+
});
209+
210+
pendingChunks.addLast(chunkFuture);
211+
212+
// Backpressure: bound in-flight
213+
if (pendingChunks.size() >= maxInFlightChunks) {
214+
awaitOldestOrFail(pendingChunks, sendFailure);
215+
}
216+
195217
chunkBuffer.clear();
196218
}
197219
} else {
@@ -246,6 +268,16 @@ public final IterateResult iterate(
246268
return new IterateResult(searchHits, lastChunk, lastChunkSequenceStart);
247269
}
248270

271+
private static void awaitOldestOrFail(ArrayDeque<CompletableFuture<Void>> inFlight, AtomicReference<Throwable> sendFailure) {
272+
final CompletableFuture<Void> oldest = inFlight.removeFirst();
273+
try {
274+
oldest.get();
275+
} catch (Exception e) {
276+
sendFailure.compareAndSet(null, e);
277+
throw new RuntimeException("Failed to send fetch chunk", e);
278+
}
279+
}
280+
249281
/**
250282
* Sends a chunk of hits to the coordinator with sequence information for ordering.
251283
*/

server/src/test/java/org/elasticsearch/search/fetch/FetchPhaseDocsIteratorTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ protected SearchHit nextDoc(int doc) {
8686
new QuerySearchResult(),
8787
null,
8888
0,
89+
null,
90+
0,
8991
null
9092
);
9193

@@ -137,7 +139,7 @@ protected SearchHit nextDoc(int doc) {
137139

138140
Exception e = expectThrows(
139141
FetchPhaseExecutionException.class,
140-
() -> it.iterate(null, reader, docs, randomBoolean(), new QuerySearchResult(), null, 0, null)
142+
() -> it.iterate(null, reader, docs, randomBoolean(), new QuerySearchResult(), null, 0, null, 0, null)
141143
);
142144
assertThat(e.getMessage(), containsString("Error running fetch phase for doc [" + badDoc + "]"));
143145
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));

0 commit comments

Comments
 (0)