diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 91d194844..68f4232da 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -24,6 +24,8 @@ import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.util.ApiTypeHelper; import co.elastic.clients.util.ObjectBuilder; @@ -34,7 +36,10 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -43,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.stream.Collectors; public class BulkIngester implements AutoCloseable { @@ -62,11 +68,12 @@ public class BulkIngester implements AutoCloseable { private @Nullable ScheduledFuture flushTask; private @Nullable ScheduledExecutorService scheduler; + private @Nullable ScheduledExecutorService retryScheduler; private boolean isExternalScheduler = false; + private BackoffPolicy backoffPolicy; // Current state - private List operations = new ArrayList<>(); - private List contexts = null; // Created on demand + private List> operations = new ArrayList<>(); private long currentSize; private int requestsInFlightCount; private volatile boolean isClosed = false; @@ -76,7 +83,8 @@ public class BulkIngester implements AutoCloseable { private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation); private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest); private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed); - private AtomicInteger listenerInProgressCount = new AtomicInteger(); + private final AtomicInteger listenerInProgressCount = new AtomicInteger(); + private final AtomicInteger retriesInProgressCount = new AtomicInteger(); private static class RequestExecution { public final long id; @@ -101,6 +109,7 @@ private BulkIngester(Builder builder) { this.maxSize = builder.bulkSize < 0 ? Long.MAX_VALUE : builder.bulkSize; this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations; this.listener = builder.listener; + this.backoffPolicy = builder.backoffPolicy; this.flushIntervalMillis = builder.flushIntervalMillis; if (flushIntervalMillis != null || listener != null) { @@ -127,6 +136,19 @@ private BulkIngester(Builder builder) { TimeUnit.MILLISECONDS ); } + + if (backoffPolicy == null) { + backoffPolicy = BackoffPolicy.noBackoff(); + } + // preparing a scheduler that will trigger flushes to retry failed requests + else { + retryScheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("bulk-ingester-retry#" + ingesterId + "#" + t.getId()); + t.setDaemon(true); + return t; + }); + } } //----- Getters @@ -168,7 +190,7 @@ public Duration flushInterval() { * The number of operations that have been buffered, waiting to be sent. */ public int pendingOperations() { - List operations = this.operations; + List> operations = this.operations; return operations == null ? 0 : operations.size(); } @@ -236,7 +258,8 @@ private boolean canAddOperation() { } private boolean closedAndFlushed() { - return isClosed && operations.isEmpty() && requestsInFlightCount == 0 && listenerInProgressCount.get() == 0; + return isClosed && operations.isEmpty() && requestsInFlightCount == 0 + && listenerInProgressCount.get() == 0 && retriesInProgressCount.get() == 0; } //----- Ingester logic @@ -273,29 +296,42 @@ private void failsafeFlush() { } public void flush() { + List> sentRequests = new ArrayList<>(); RequestExecution exec = sendRequestCondition.whenReadyIf( () -> { // May happen on manual and periodic flushes - return !operations.isEmpty(); + return !operations.isEmpty() && operations.stream() + .anyMatch(RetryableBulkOperation::isSendable); }, () -> { + // Selecting operations that can be sent immediately, + // Dividing actual operations from contexts + List immediateOps = new ArrayList<>(); + List contexts = new ArrayList<>(); + + for(Iterator> it = operations.iterator(); it.hasNext();){ + RetryableBulkOperation op = it.next(); + if (op.isSendable()) { + immediateOps.add(op.operation()); + contexts.add(op.context()); + + sentRequests.add(op); + it.remove(); + } + } + // Build the request - BulkRequest request = newRequest().operations(operations).build(); - List requestContexts = contexts == null ? Collections.nCopies(operations.size(), - null) : contexts; + BulkRequest request = newRequest().operations(immediateOps).build(); // Prepare for next round - operations = new ArrayList<>(); - contexts = null; - currentSize = 0; + currentSize = operations.size(); addCondition.signalIfReady(); long id = sendRequestCondition.invocations(); if (listener != null) { - BulkRequest finalRequest = request; // synchronous execution to make sure it actually runs before - listener.beforeBulk(id, finalRequest, requestContexts); + listener.beforeBulk(id, request, contexts); } CompletionStage result = client.bulk(request); @@ -306,42 +342,80 @@ public void flush() { request = null; } - return new RequestExecution<>(id, request, requestContexts, result); + return new RequestExecution<>(id, request, contexts, result); }); if (exec != null) { // A request was actually sent exec.futureResponse.handle((resp, thr) -> { if (resp != null) { - // Success - if (listener != null) { - listenerInProgressCount.incrementAndGet(); - scheduler.submit(() -> { - try { - listener.afterBulk(exec.id, exec.request, exec.contexts, resp); - } - finally { - if(listenerInProgressCount.decrementAndGet() == 0){ - closeCondition.signalIfReady(); + + // Success? Checking if total or partial + List failedRequestsCanRetry = resp.items().stream() + .filter(i -> i.error() != null && i.status() == 429) + .collect(Collectors.toList()); + + if (failedRequestsCanRetry.isEmpty() || backoffPolicy.equals(BackoffPolicy.noBackoff())) { + // Total success! ...or there's no retry policy implemented. Either way, can call + listenerAfterBulkSuccess(resp, exec); + } else { + // Partial success, retrying failed requests if policy allows it + // Keeping list of retryable requests/responses, to exclude them for calling + // listener later + List> retryableReq = new ArrayList<>(); + List> refires = new ArrayList<>(); + List retryableResp = new ArrayList<>(); + + for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) { + int index = resp.items().indexOf(bulkItemResponse); + selectingRetries(index, bulkItemResponse, sentRequests, retryableResp, retryableReq, refires); + } + // Scheduling flushes for just sent out retryable requests + if (!refires.isEmpty()) { + scheduleRetries(refires); + } + // Retrieving list of remaining successful or not retryable requests + retryableReq.forEach(sentRequests::remove); + if (!sentRequests.isEmpty()) { + if (listener != null) { + // Creating partial BulkRequest + List partialOps = new ArrayList<>(); + List partialCtx = new ArrayList<>(); + for (RetryableBulkOperation op : sentRequests) { + partialOps.add(op.operation()); + partialCtx.add(op.context()); } + BulkRequest partialRequest = newRequest().operations(partialOps).build(); + + // Filtering response + List partialItems = resp.items() + .stream() + .filter(i -> !retryableResp.contains(i)) + .collect(Collectors.toList()); + + BulkResponse partialResp = BulkResponse.of(br -> br + .items(partialItems) + .errors(resp.errors()) + .took(resp.took()) + .ingestTook(resp.ingestTook())); + + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, partialRequest, partialCtx, partialResp); + } finally { + if (listenerInProgressCount.decrementAndGet() == 0) { + closeCondition.signalIfReady(); + } + } + }); } - }); + } + } } else { // Failure - if (listener != null) { - listenerInProgressCount.incrementAndGet(); - scheduler.submit(() -> { - try { - listener.afterBulk(exec.id, exec.request, exec.contexts, thr); - } - finally { - if(listenerInProgressCount.decrementAndGet() == 0){ - closeCondition.signalIfReady(); - } - } - }); - } + listenerAfterBulkException(thr, exec); } sendRequestCondition.signalIfReadyAfter(() -> { @@ -353,35 +427,105 @@ public void flush() { } } + private void selectingRetries(int index, BulkResponseItem bulkItemResponse, + List> sentRequests, + List retryableResp, + List> retryableReq, + List> refires) { + + // Getting original failed, requests and keeping successful ones to send to the listener + RetryableBulkOperation original = sentRequests.get(index); + if (original.canRetry()) { + retryableResp.add(bulkItemResponse); + Iterator retryTimes = Optional.ofNullable(original.retries()).orElse(backoffPolicy.iterator()); + RetryableBulkOperation refire = new RetryableBulkOperation<>(original.operation(), original.context(), retryTimes); + retryableReq.add(original); + refires.add(refire); + addRetry(refire); + logger.warn("Added failed request back in queue, retrying in : " + refire.currentRetryTimeDelay() + " ms"); + } else { + logger.warn("Retries finished for request: " + original.operation()._kind().toString()); + } + } + + private void listenerAfterBulkException(Throwable thr, RequestExecution exec) { + if (listener != null) { + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, exec.request, exec.contexts, thr); + } finally { + if (listenerInProgressCount.decrementAndGet() == 0) { + closeCondition.signalIfReady(); + } + } + }); + } + } + + private void listenerAfterBulkSuccess(BulkResponse resp, RequestExecution exec) { + if (listener != null) { + listenerInProgressCount.incrementAndGet(); + scheduler.submit(() -> { + try { + listener.afterBulk(exec.id, exec.request, exec.contexts, resp); + } finally { + if (listenerInProgressCount.decrementAndGet() == 0) { + closeCondition.signalIfReady(); + } + } + }); + } + } + + private void scheduleRetries(List> retryableReq) { + LongSummaryStatistics statsDelays = retryableReq.stream() + .map(RetryableBulkOperation::currentRetryTimeDelay) + .mapToLong(Long::longValue) + .summaryStatistics(); + + // scheduling earlier and latest delay + retryScheduler.schedule(this::flush, statsDelays.getMin(), TimeUnit.MILLISECONDS); + retryScheduler.schedule(this::flush, statsDelays.getMax(), TimeUnit.MILLISECONDS); + + } + public void add(BulkOperation operation, Context context) { if (isClosed) { throw new IllegalStateException("Ingester has been closed"); } - IngesterOperation ingestOp = IngesterOperation.of(operation, client._jsonpMapper()); + RetryableBulkOperation repeatableOp = new RetryableBulkOperation<>(operation, context, + null); - addCondition.whenReady(() -> { + innerAdd(repeatableOp); + } - if (context != null) { - // Lazily build the context list - if (contexts == null) { - int size = operations.size(); - if (size == 0) { - contexts = new ArrayList<>(); - } else { - contexts = new ArrayList<>(Collections.nCopies(size, null)); - } + // Same as "add", but skips the closed check to allow retries to be added even after ingester closure + private void addRetry(RetryableBulkOperation repeatableOp) { + // Sending the operation back in the queue using the retry scheduler + retriesInProgressCount.incrementAndGet(); + retryScheduler.submit(() -> { + try { + innerAdd(repeatableOp); + } finally { + if (retriesInProgressCount.decrementAndGet() == 0) { + closeCondition.signalIfReady(); } - contexts.add(context); } + }); + } + + private void innerAdd(RetryableBulkOperation repeatableOp) { + IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper()); - operations.add(ingestOp.operation()); + addCondition.whenReady(() -> { + operations.add(ingestOp.repeatableOperation()); currentSize += ingestOp.size(); if (!canAddOperation()) { flush(); - } - else { + } else { addCondition.signalIfReady(); } }); @@ -423,6 +567,10 @@ public void close() { if (scheduler != null && !isExternalScheduler) { scheduler.shutdownNow(); } + + if (retryScheduler != null) { + retryScheduler.shutdownNow(); + } } //---------------------------------------------------------------------------------------------------- @@ -440,6 +588,7 @@ public static class Builder implements ObjectBuilder listener; private ScheduledExecutorService scheduler; + private BackoffPolicy backoffPolicy; public Builder client(ElasticsearchAsyncClient client) { this.client = client; @@ -455,7 +604,8 @@ public Builder client(ElasticsearchClient client) { } /** - * Sets when to flush a new bulk request based on the number of operations currently added. Defaults to + * Sets when to flush a new bulk request based on the number of operations currently added. + * Defaults to * {@code 1000}. Can be set to {@code -1} to disable it. * * @throws IllegalArgumentException if less than -1. @@ -519,6 +669,7 @@ public Builder flushInterval(long value, TimeUnit unit) { *

* Flushing is still subject to the maximum number of requests set with * {@link #maxConcurrentRequests}. + * * @deprecated use {@link #scheduler(ScheduledExecutorService)} */ @Deprecated @@ -541,6 +692,15 @@ public Builder listener(BulkListener listener) { return this; } + /** + * Sets the backoff policy that will handle retries for error 429: too many requests. + * All the times are defined in milliseconds. + */ + public Builder backoffPolicy(BackoffPolicy backoffPolicy) { + this.backoffPolicy = backoffPolicy; + return this; + } + /** * Sets global bulk request settings that will be applied to all requests sent by the ingester. */ diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index f2a2ac28c..b8cd7ac2c 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -34,112 +34,112 @@ /** * A bulk operation whose size has been calculated and content turned to a binary blob (to compute its size). */ -public class IngesterOperation { - private final BulkOperation operation; +class IngesterOperation { + private final RetryableBulkOperation repeatableOp; private final long size; - public IngesterOperation(BulkOperation operation, long size) { - this.operation = operation; + IngesterOperation(RetryableBulkOperation repeatableOp, long size) { + this.repeatableOp = repeatableOp; this.size = size; } - public static IngesterOperation of(BulkOperation operation, JsonpMapper mapper) { - switch (operation._kind()) { + public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + switch (repeatableOp.operation()._kind()) { case Create: - return createOperation(operation, mapper); + return createOperation(repeatableOp, mapper); case Index: - return indexOperation(operation, mapper); + return indexOperation(repeatableOp, mapper); case Update: - return updateOperation(operation, mapper); + return updateOperation(repeatableOp, mapper); case Delete: - return deleteOperation(operation); + return deleteOperation(repeatableOp); default: - throw new IllegalStateException("Unknown bulk operation type " + operation._kind()); + throw new IllegalStateException("Unknown bulk operation type " + repeatableOp.operation()._kind()); } } - public BulkOperation operation() { - return this.operation; + public RetryableBulkOperation repeatableOperation() { + return this.repeatableOp; } public long size() { return this.size; } - private static IngesterOperation createOperation(BulkOperation operation, JsonpMapper mapper) { - CreateOperation create = operation.create(); - BulkOperation newOperation; + private static IngesterOperation createOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + CreateOperation create = repeatableOp.operation().create(); + RetryableBulkOperation newOperation; long size = basePropertiesSize(create); if (create.document() instanceof BinaryData) { - newOperation = operation; + newOperation = repeatableOp; size += ((BinaryData) create.document()).size(); } else { BinaryData binaryDoc = BinaryData.of(create.document(), mapper); size += binaryDoc.size(); - newOperation = BulkOperation.of(bo -> bo.create(idx -> { + newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.create(idx -> { copyCreateProperties(create, idx); return idx.document(binaryDoc); - })); + })),repeatableOp.context(),repeatableOp.retries()); } return new IngesterOperation(newOperation, size); } - private static IngesterOperation indexOperation(BulkOperation operation, JsonpMapper mapper) { - IndexOperation index = operation.index(); - BulkOperation newOperation; + private static IngesterOperation indexOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + IndexOperation index = repeatableOp.operation().index(); + RetryableBulkOperation newOperation; long size = basePropertiesSize(index); if (index.document() instanceof BinaryData) { - newOperation = operation; + newOperation = repeatableOp; size += ((BinaryData) index.document()).size(); } else { BinaryData binaryDoc = BinaryData.of(index.document(), mapper); size += binaryDoc.size(); - newOperation = BulkOperation.of(bo -> bo.index(idx -> { + newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.index(idx -> { copyIndexProperties(index, idx); return idx.document(binaryDoc); - })); + })),repeatableOp.context(),repeatableOp.retries()); } return new IngesterOperation(newOperation, size); } - private static IngesterOperation updateOperation(BulkOperation operation, JsonpMapper mapper) { - UpdateOperation update = operation.update(); - BulkOperation newOperation; + private static IngesterOperation updateOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + UpdateOperation update = repeatableOp.operation().update(); + RetryableBulkOperation newOperation; long size = basePropertiesSize(update) + size("retry_on_conflict", update.retryOnConflict()) + size("require_alias", update.requireAlias()); if (update.binaryAction() != null) { - newOperation = operation; + newOperation = repeatableOp; size += update.binaryAction().size(); } else { BinaryData action = BinaryData.of(update.action(), mapper); size += action.size(); - newOperation = BulkOperation.of(bo -> bo.update(u -> { + newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.update(u -> { copyBaseProperties(update, u); return u .binaryAction(action) .requireAlias(update.requireAlias()) .retryOnConflict(update.retryOnConflict()); - })); + })),repeatableOp.context(),repeatableOp.retries()); } return new IngesterOperation(newOperation, size); } - private static IngesterOperation deleteOperation(BulkOperation operation) { - DeleteOperation delete = operation.delete(); - return new IngesterOperation(operation, basePropertiesSize(delete)); + private static IngesterOperation deleteOperation(RetryableBulkOperation repeatableOp) { + DeleteOperation delete = repeatableOp.operation().delete(); + return new IngesterOperation(repeatableOp, basePropertiesSize(delete)); } diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/RetryableBulkOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/RetryableBulkOperation.java new file mode 100644 index 000000000..19f203fad --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/RetryableBulkOperation.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.elasticsearch._helpers.bulk; + +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; + +import java.util.Iterator; +import java.util.Optional; + +class RetryableBulkOperation { + private final BulkOperation operation; + private final Context context; + private final Iterator retries; + private final Long retryTime; + + RetryableBulkOperation(BulkOperation request, Context context, Iterator retries) { + this.operation = request; + this.context = context; + this.retries = retries; + // if the retries iterator is null it means that it's not a retry, otherwise calculating retry time + long currentMillis = currentMillis(); + this.retryTime = Optional.ofNullable(retries).map(r -> currentMillis + r.next()).orElse(currentMillis); + } + + public BulkOperation operation() { + return operation; + } + + public Context context() { + return context; + } + + public Iterator retries() { + return retries; + } + + public long currentRetryTimeDelay() { + return this.retryTime - currentMillis(); + } + + public boolean canRetry() { + return Optional.ofNullable(retries).map(Iterator::hasNext).orElse(true); + } + + public boolean isSendable() { + return (this.retryTime - currentMillis()) <= 0; + } + + private Long currentMillis(){ + return System.nanoTime()/1_000_000L; + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/BackoffPolicy.java b/java-client/src/main/java/co/elastic/clients/transport/BackoffPolicy.java new file mode 100644 index 000000000..798af4c06 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/BackoffPolicy.java @@ -0,0 +1,329 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Provides a set of generic backoff policies. Backoff policies are used to calculate the number of times an action will be retried + * and the intervals between those retries. + * + * Notes for implementing custom subclasses: + * + * The underlying mathematical principle of BackoffPolicy are progressions which can be either finite or infinite although + * the latter should not be used for retrying. A progression can be mapped to a java.util.Iterator with the following + * semantics: + * + *

    + *
  • #hasNext() determines whether the progression has more elements. Return true for infinite progressions + *
  • + *
  • #next() determines the next element in the progression, i.e. the next wait time period
  • + *
+ * + * Note that backoff policies are exposed as Iterables in order to be consumed multiple times. + */ +public abstract class BackoffPolicy implements Iterable { + private static final BackoffPolicy NO_BACKOFF = new NoBackoff(); + + /** + * Creates a backoff policy that will not allow any backoff, i.e. an operation will fail after the first attempt. + * + * @return A backoff policy without any backoff period. The returned instance is thread safe. + */ + public static BackoffPolicy noBackoff() { + return NO_BACKOFF; + } + + /** + * Creates an new constant backoff policy with the provided configuration. + * + * @param delay The delay defines how long to wait between retry attempts. Must not be null. + * Must be <= Integer.MAX_VALUE ms. + * @param maxNumberOfRetries The maximum number of retries. Must be a non-negative number. + * @return A backoff policy with a constant wait time between retries. The returned instance is thread safe but each + * iterator created from it should only be used by a single thread. + */ + public static BackoffPolicy constantBackoff(Long delay, int maxNumberOfRetries) { + return new ConstantBackoff(checkDelay(delay), maxNumberOfRetries); + } + + /** + * Creates an new exponential backoff policy with a default configuration of 50 ms initial wait period and 8 retries taking + * roughly 5.1 seconds in total. + * + * @return A backoff policy with an exponential increase in wait time for retries. The returned instance is thread safe but each + * iterator created from it should only be used by a single thread. + */ + public static BackoffPolicy exponentialBackoff() { + return exponentialBackoff(50L, 8); + } + + /** + * Creates an new exponential backoff policy with the provided configuration. + * + * @param initialDelay The initial delay defines how long to wait for the first retry attempt. Must not be null. + * Must be <= Integer.MAX_VALUE ms. + * @param maxNumberOfRetries The maximum number of retries. Must be a non-negative number. + * @return A backoff policy with an exponential increase in wait time for retries. The returned instance is thread safe but each + * iterator created from it should only be used by a single thread. + */ + public static BackoffPolicy exponentialBackoff(Long initialDelay, int maxNumberOfRetries) { + return new ExponentialBackoff(checkDelay(initialDelay), maxNumberOfRetries); + } + + /** + * Creates a new linear backoff policy with the provided configuration + * + * @param delayIncrement The amount by which to increment the delay on each retry + * @param maxNumberOfRetries The maximum number of retries + * @param maximumDelay The maximum delay + * @return A backoff policy with linear increase in wait time for retries. + */ + public static BackoffPolicy linearBackoff(Long delayIncrement, int maxNumberOfRetries, Long maximumDelay) { + return new LinearBackoff(delayIncrement, maxNumberOfRetries, maximumDelay); + } + + /** + * Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy. + */ + public static BackoffPolicy wrap(BackoffPolicy delegate, Runnable onBackoff) { + return new WrappedBackoffPolicy(delegate, onBackoff); + } + + private static Long checkDelay(Long delay) { + if (delay > Integer.MAX_VALUE) { + throw new IllegalArgumentException("delay must be <= " + Integer.MAX_VALUE + " ms"); + } + return delay; + } + + private static class NoBackoff extends BackoffPolicy { + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + + @Override + public String toString() { + return "NoBackoff"; + } + } + + private static class ExponentialBackoff extends BackoffPolicy { + private final Long start; + + private final int numberOfElements; + + private ExponentialBackoff(Long start, int numberOfElements) { + assert start >= 0; + assert numberOfElements >= 0; + this.start = start; + this.numberOfElements = numberOfElements; + } + + @Override + public Iterator iterator() { + return new ExponentialBackoffIterator(start, numberOfElements); + } + + @Override + public String toString() { + return "ExponentialBackoff{start=" + start + ", numberOfElements=" + numberOfElements + '}'; + } + } + + private static class ExponentialBackoffIterator implements Iterator { + private final int numberOfElements; + + private final Long start; + + private int currentlyConsumed; + + private ExponentialBackoffIterator(Long start, int numberOfElements) { + this.start = start; + this.numberOfElements = numberOfElements; + } + + @Override + public boolean hasNext() { + return currentlyConsumed < numberOfElements; + } + + @Override + public Long next() { + if (!hasNext()) { + throw new NoSuchElementException("Only up to " + numberOfElements + " elements"); + } + Long result = start + 10L * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1); + currentlyConsumed++; + return result; + } + } + + private static final class ConstantBackoff extends BackoffPolicy { + private final Long delay; + + private final int numberOfElements; + + ConstantBackoff(Long delay, int numberOfElements) { + assert numberOfElements >= 0; + this.delay = delay; + this.numberOfElements = numberOfElements; + } + + @Override + public Iterator iterator() { + return new ConstantBackoffIterator(delay, numberOfElements); + } + + @Override + public String toString() { + return "ConstantBackoff{delay=" + delay + ", numberOfElements=" + numberOfElements + '}'; + } + } + + private static final class ConstantBackoffIterator implements Iterator { + private final Long delay; + private final int numberOfElements; + private int curr; + + ConstantBackoffIterator(Long delay, int numberOfElements) { + this.delay = delay; + this.numberOfElements = numberOfElements; + } + + @Override + public boolean hasNext() { + return curr < numberOfElements; + } + + @Override + public Long next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + curr++; + return delay; + } + } + + private static final class WrappedBackoffPolicy extends BackoffPolicy { + private final BackoffPolicy delegate; + private final Runnable onBackoff; + + WrappedBackoffPolicy(BackoffPolicy delegate, Runnable onBackoff) { + this.delegate = delegate; + this.onBackoff = onBackoff; + } + + @Override + public Iterator iterator() { + return new WrappedBackoffIterator(delegate.iterator(), onBackoff); + } + + @Override + public String toString() { + return "WrappedBackoffPolicy{delegate=" + delegate + ", onBackoff=" + onBackoff + '}'; + } + } + + private static final class WrappedBackoffIterator implements Iterator { + private final Iterator delegate; + private final Runnable onBackoff; + + WrappedBackoffIterator(Iterator delegate, Runnable onBackoff) { + this.delegate = delegate; + this.onBackoff = onBackoff; + } + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public Long next() { + if (false == delegate.hasNext()) { + throw new NoSuchElementException(); + } + onBackoff.run(); + return delegate.next(); + } + } + + private static final class LinearBackoff extends BackoffPolicy { + + private final Long delayIncrement; + private final int maxNumberOfRetries; + private final Long maximumDelay; + + private LinearBackoff(Long delayIncrement, int maxNumberOfRetries, @Nullable Long maximumDelay) { + this.delayIncrement = delayIncrement; + this.maxNumberOfRetries = maxNumberOfRetries; + this.maximumDelay = maximumDelay; + } + + @Override + public Iterator iterator() { + return new LinearBackoffIterator(delayIncrement, maxNumberOfRetries, maximumDelay); + } + + @Override + public String toString() { + return "LinearBackoff{" + + "delayIncrement=" + + delayIncrement + + ", maxNumberOfRetries=" + + maxNumberOfRetries + + ", maximumDelay=" + + maximumDelay + + '}'; + } + } + + private static final class LinearBackoffIterator implements Iterator { + + private final Long delayIncrement; + private final int maxNumberOfRetries; + private final Long maximumDelay; + private int curr; + + private LinearBackoffIterator(Long delayIncrement, int maxNumberOfRetries, @Nullable Long maximumDelay) { + this.delayIncrement = delayIncrement; + this.maxNumberOfRetries = maxNumberOfRetries; + this.maximumDelay = maximumDelay; + } + + @Override + public boolean hasNext() { + return curr < maxNumberOfRetries; + } + + @Override + public Long next() { + curr++; + Long Long = curr * delayIncrement; + return maximumDelay == null ? Long : Long.compareTo(maximumDelay) < 0 ? Long : maximumDelay; + } + } +} diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java new file mode 100644 index 000000000..dede24b9c --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterRetryPolicyTest.java @@ -0,0 +1,544 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.elasticsearch._helpers.bulk; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.ErrorCause; +import co.elastic.clients.elasticsearch.core.BulkRequest; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import co.elastic.clients.elasticsearch.core.bulk.CreateOperation; +import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; +import co.elastic.clients.elasticsearch.core.bulk.OperationType; +import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.json.SimpleJsonpMapper; +import co.elastic.clients.transport.BackoffPolicy; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.Endpoint; +import co.elastic.clients.transport.TransportOptions; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BulkIngesterRetryPolicyTest { + + protected static ElasticsearchClient client; + protected static TestTransport transport; + protected static CountingListener listener; + + private BulkOperation create = BulkOperation.of(b -> b.create(c -> c.index("foo").id("1").document("1"))); + private BulkOperation index = BulkOperation.of(b -> b.index(c -> c.index("fooo").id("2").document("2"))); + private BulkOperation indexFail = BulkOperation.of(b -> b.index(c -> c.index("fail").id("2").document( + "2"))); + private BulkOperation delete = BulkOperation.of(b -> b.delete(c -> c.index("foooo").id("3"))); + private BulkOperation deleteFail = BulkOperation.of(b -> b.delete(c -> c.index("fail").id("3"))); + + @BeforeEach + public void setup() { + transport = new TestTransport(); + client = new ElasticsearchClient(transport); + listener = new CountingListener(); + } + + @Test + public void retryTestNoScheduledFlushNoContext() throws Exception { + + // First test, partial success, other will succeed after retrying + { + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(create); + ingester.add(index); + ingester.add(index); + + ingester.close(); + + // 1 instant success, 2 retried, but succeeded. can be either 2 or 3 depending on the retries, + // if they get scheduled at the same exact time + assertTrue(listener.requests.get() == 2 || listener.requests.get() == 3); + // eventually all 3 have to succeed + assertTrue(listener.successOperations.get() == 3); + + // 1 for the create and first try for the indexes, 2 + 2 for both index retries, + // which could be scheduled at the same time, so from 3 to 5 + assertTrue(listener.sentRequestsTotal.get() >= 3 && listener.sentRequestsTotal.get() <= 5); + } + + // Second test, all requests will succeed after retrying + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(index); + ingester.add(index); + ingester.add(index); + ingester.add(index); + + ingester.close(); + + // between 1 and 4, depending on scheduler + assertTrue(listener.requests.get() >= 1 && listener.requests.get() <= 4); + // eventually all 4 have to succeed + assertTrue(listener.successOperations.get() == 4); + // between 3 and 9, depending on scheduler + assertTrue(listener.sentRequestsTotal.get() >= 3 && listener.sentRequestsTotal.get() <= 9); + } + + // Third test, only one retryable (will succeed), other permanent failures + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(index); + ingester.add(delete); + ingester.add(delete); + + ingester.close(); + + // 2 failed will be handled together, then 1 retry + assertTrue(listener.requests.get() == 2); + + assertTrue(listener.successOperations.get() == 1); + assertTrue(listener.errorOperations.get() == 2); + // 1 initial + 2 retries + assertTrue(listener.sentRequestsTotal.get() == 3); + } + + // Fourth test, all requests will be retried until policy allows, then fail + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(indexFail); + ingester.add(indexFail); + ingester.add(indexFail); + + ingester.close(); + + // from 1 to 3 depending on scheduling + assertTrue(listener.requests.get() >= 1 && listener.requests.get() <= 3); + + assertTrue(listener.successOperations.get() == 0); + assertTrue(listener.errorOperations.get() == 3); + // between 8 and 24, depending on scheduler + assertTrue(listener.sentRequestsTotal.get() >= 8 && listener.sentRequestsTotal.get() <= 24); + } + + // Fifth test, one exception that will make everything else fail, no retries triggered + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(index); + ingester.add(create); + ingester.add(deleteFail); + + ingester.close(); + + // just the one + assertTrue(listener.requests.get() == 1); + + assertTrue(listener.successOperations.get() == 0); + assertTrue(listener.errorOperations.get() == 3); + + // just the one + assertTrue(listener.sentRequestsTotal.get() == 1); + } + + // Sixth test, a mix of everything + { + transport.reset(); + listener.reset(); + + BulkIngester ingester = newBasicBulkIngester(listener); + + ingester.add(create); + ingester.add(index); + ingester.add(indexFail); + ingester.add(delete); + ingester.add(create); + ingester.add(index); + ingester.add(indexFail); + ingester.add(delete); + + ingester.close(); + + // from 2 to 4 depending on scheduling + assertTrue(listener.requests.get() >= 1 && listener.successOperations.get() <= 4); + + assertTrue(listener.successOperations.get() == 4); + assertTrue(listener.errorOperations.get() == 4); + + // between 8 and 18, depending on scheduler + assertTrue(listener.sentRequestsTotal.get() >= 8 && listener.sentRequestsTotal.get() <= 18); + } + + transport.close(); + } + + @Test + public void retryTestFlushAndContextExponentialBackoff() throws Exception { + + // One success, other will succeed after retrying, other will fail eventually + { + BulkIngester ingester = newBulkIngesterWithFlushAndContextAndLongExponential(listener); + + ingester.add(create, 1); + ingester.add(indexFail, 2); + ingester.add(index, 3); + + ingester.close(); + + // should be 3 separate requests sent, one instant, one after a few retries, the last one error. + assertTrue(listener.requests.get() == 3); + // 2 will succeed, one will fail + assertTrue(listener.successOperations.get() == 2); + assertTrue(listener.errorOperations.get() == 1); + // between 8 and 10, depending on scheduler (first one + 2 retries for index + 8 retries for + // indexfail) + assertTrue(listener.sentRequestsTotal.get() >= 8 && listener.sentRequestsTotal.get() <= 11); + // checking order of contexts after send confirmed + Iterator iter = listener.sentContexts.iterator(); + // first one being completed is create + assertTrue(iter.next().equals(1)); + // second one is index, which will take only 2 retries + assertTrue(iter.next().equals(3)); + // last one is indexFail, taking 8 retries to fail + assertTrue(iter.next().equals(2)); + } + + transport.close(); + } + + @Test + public void retryTestNoFlushAndContextExponentialBackoff() throws Exception { + + // One success, other will succeed after retrying, other will fail eventually + { + BulkIngester ingester = newBulkIngesterNoFlushAndContextAndLongExponential(listener); + + ingester.add(create, 1); + ingester.add(indexFail, 2); + ingester.add(index, 3); + + ingester.close(); + + // should be 3 separate requests sent, one instant, one after a few retries, the last one error. + assertTrue(listener.requests.get() == 3); + // 2 will succeed, one will fail + assertTrue(listener.successOperations.get() == 2); + assertTrue(listener.errorOperations.get() == 1); + // between 8 and 10, depending on scheduler (first one + 2 retries for index + 8 retries for + // indexfail) + assertTrue(listener.sentRequestsTotal.get() >= 8 && listener.sentRequestsTotal.get() <= 11); + // checking order of contexts after send confirmed + Iterator iter = listener.sentContexts.iterator(); + // first one being completed is create + assertTrue(iter.next().equals(1)); + // second one is index, which will take only 2 retries + assertTrue(iter.next().equals(3)); + // last one is indexFail, taking 8 retries to fail + assertTrue(iter.next().equals(2)); + } + + transport.close(); + } + + @Test + public void retryMultiThreadStressTest() throws InterruptedException, IOException { + + // DISCLAIMER: this configuration is highly inefficient and only used here to showcase an extreme + // situation where the number of adding threads greatly exceeds the number of concurrent requests + // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests + // accordingly. + BulkIngester ingester = BulkIngester.of(b -> b + .client(client) + .listener(listener) + .flushInterval(5, TimeUnit.SECONDS) + .backoffPolicy(BackoffPolicy.constantBackoff(50L, 8))); + + ExecutorService executor = Executors.newFixedThreadPool(50); + + // sends create operations, but once every 1000, one index operation will be sent, + // and once every 5000 an indexFail + for (int i = 0; i < 100000; i++) { + int ii = i; + Runnable thread = () -> { + int finalI = ii; + if (ii % 1000 == 0) { + ingester.add(index, ii); + } else if (ii % 5000 == 0) { + ingester.add(indexFail, ii); + } else { + ingester.add(create, ii); + } + }; + executor.submit(thread); + } + + executor.awaitTermination(10, TimeUnit.SECONDS); + ingester.close(); + + // all operations will succeed eventually, so the total has to be 100000 + assertTrue(listener.successOperations.get() == 100000); + assertTrue(listener.sentContexts.size() == 100000); + assertTrue(listener.errorOperations.get() == 0); + // it's difficult to predict how many requests will be sent, but considering they will be sent + // in batches of 1000, without retries it should be exactly 100, considering that 100 out of + // 100000 will be retried 3 times and 20 will be retried 8 times, if they don't get batched together + // with the others it could result in a total of 560, which is highly unlikely. + // more reasonably there will be between 100 and 300 requests sent. + assertTrue(listener.sentRequestsTotal.get() >= 100 && listener.sentRequestsTotal.get() <= 300); + // same reasoning + assertTrue(listener.requests.get() >= 100 && listener.requests.get() <= 300); + + transport.close(); + } + + + private static class TestTransport implements ElasticsearchTransport { + public final AtomicInteger requestsStarted = new AtomicInteger(); + public final AtomicInteger requestsCompleted = new AtomicInteger(); + public final AtomicInteger operations = new AtomicInteger(); + + public ConcurrentHashMap retryFailures = new ConcurrentHashMap<>(); + + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + @Override + public ResponseT performRequest( + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options + ) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture performRequestAsync(RequestT request, Endpoint endpoint, @Nullable TransportOptions options) { + + BulkRequest bulk = (BulkRequest) request; + requestsStarted.incrementAndGet(); + operations.addAndGet(bulk.operations().size()); + + if (bulk.operations().isEmpty()) { + System.out.println("No operations!"); + } + + // For testing purposes, different result depending on the operation type. + // Create will always succeed + // Index will always return 429 for 2 times, then 200. Index with index name "fail" will only + // return 429. + // Delete will always return 404. Delete with index name "fail" will throw transport exception. + + try { + + List items = new ArrayList<>(); + for (BulkOperation op : bulk.operations()) { + OperationType operationType = OperationType.Create; + ErrorCause error = null; + int status = 200; + String index = null; + switch (op._kind()) { + case Index: + index = ((IndexOperation) op._get()).index(); + operationType = OperationType.Index; + retryFailures.putIfAbsent(op, 0); + boolean isStillRetrying = retryFailures.computeIfPresent(op, (k, v) -> v + 1) > 2; + error = isStillRetrying && !index.equals("fail") ? null : + ErrorCause.of(e -> e.reason("some error")); + status = isStillRetrying && !index.equals("fail") ? 200 : 429; + break; + case Delete: + index = ((DeleteOperation) op._get()).index(); + if (index.equals("fail")) { + throw new RuntimeException("error"); + } + operationType = OperationType.Delete; + error = ErrorCause.of(e -> e.reason("some error")); + status = 404; + break; + default: + index = ((CreateOperation) op._get()).index(); + break; + } + ErrorCause finalError = error; + int finalStatus = status; + OperationType finalOperationType = operationType; + String finalIndex = index; + items.add(BulkResponseItem.of(b -> b + .index(finalIndex) + .operationType(finalOperationType) + .status(finalStatus) + .error(finalError))); + } + + CompletableFuture response = new CompletableFuture<>(); + executor.submit(() -> { + requestsCompleted.incrementAndGet(); + response.complete(BulkResponse.of(r -> r.errors(false).items(items).took(3))); + }); + + @SuppressWarnings("unchecked") + CompletableFuture result = (CompletableFuture) response; + return result; + } catch (RuntimeException e) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + executor.submit(() -> { + future.completeExceptionally(e); + }); + return future; + } + } + + @Override + public JsonpMapper jsonpMapper() { + return SimpleJsonpMapper.INSTANCE; + } + + @Override + public TransportOptions options() { + return null; + } + + @Override + public void close() throws IOException { + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public void reset() { + requestsStarted.set(0); + requestsCompleted.set(0); + operations.set(0); + retryFailures = new ConcurrentHashMap<>(); + } + } + + private static class CountingListener implements BulkListener { + public final AtomicInteger sentRequestsTotal = new AtomicInteger(); + public final AtomicInteger successOperations = new AtomicInteger(); + public final AtomicInteger errorOperations = new AtomicInteger(); + public final AtomicInteger requests = new AtomicInteger(); + public final ConcurrentLinkedQueue sentContexts = new ConcurrentLinkedQueue<>(); + + @Override + public void beforeBulk(long executionId, BulkRequest request, List contexts) { + sentRequestsTotal.incrementAndGet(); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, + BulkResponse response) { + for (BulkResponseItem item : response.items()) { + if (item.error() != null) { + errorOperations.incrementAndGet(); + } else { + successOperations.incrementAndGet(); + } + } + if (contexts.stream().anyMatch(Objects::nonNull)) { + sentContexts.addAll(contexts); + } + requests.incrementAndGet(); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, + Throwable failure) { + failure.printStackTrace(); + errorOperations.addAndGet(request.operations().size()); + if (contexts.stream().anyMatch(Objects::nonNull)) { + sentContexts.addAll(contexts); + } + requests.incrementAndGet(); + } + + public void reset() { + successOperations.set(0); + errorOperations.set(0); + requests.set(0); + sentRequestsTotal.set(0); + } + } + + private BulkIngester newBasicBulkIngester(BulkListener listener) { + return BulkIngester.of(b -> b + .client(client) + .maxOperations(10) + .maxConcurrentRequests(10) + .listener(listener) + .backoffPolicy(BackoffPolicy.constantBackoff(50L, 8)) + ); + } + + private BulkIngester newBulkIngesterWithFlushAndContextAndLongExponential(BulkListener listener) { + return BulkIngester.of(b -> b + .client(client) + .maxOperations(10) + .maxConcurrentRequests(10) + .listener(listener) + .flushInterval(1000, TimeUnit.MILLISECONDS) + .backoffPolicy(BackoffPolicy.exponentialBackoff(100L, 8)) + ); + } + + private BulkIngester newBulkIngesterNoFlushAndContextAndLongExponential(BulkListener listener) { + return BulkIngester.of(b -> b + .client(client) + .maxOperations(10) + .maxConcurrentRequests(10) + .listener(listener) + .backoffPolicy(BackoffPolicy.exponentialBackoff(100L, 8)) + ); + } +} diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index a76f3f75f..76a48b9fa 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -185,9 +185,9 @@ public void multiThreadStressTest() throws InterruptedException, IOException { // situation where the number of adding threads greatly exceeds the number of concurrent requests // handled by the ingester. It's strongly recommended to always tweak maxConcurrentRequests accordingly. BulkIngester ingester = BulkIngester.of(b -> b - .client(client) - .globalSettings(s -> s.index(index)) - .flushInterval(5, TimeUnit.SECONDS) + .client(client) + .globalSettings(s -> s.index(index)) + .flushInterval(5, TimeUnit.SECONDS) ); RequestTest.AppData appData = new RequestTest.AppData(); @@ -223,7 +223,7 @@ public void multiThreadStressTest() throws InterruptedException, IOException { public void sizeLimitTest() throws Exception { TestTransport transport = new TestTransport(); - long operationSize = IngesterOperation.of(operation, transport.jsonpMapper()).size(); + long operationSize = IngesterOperation.of(new RetryableBulkOperation<>(operation, null, null), transport.jsonpMapper()).size(); BulkIngester ingester = BulkIngester.of(b -> b .client(new ElasticsearchAsyncClient(transport)) @@ -437,17 +437,18 @@ public void pipelineTest() { JsonpMapper mapper = new SimpleJsonpMapper(); BulkOperation create = BulkOperation.of(o -> o.create(c -> c - .pipeline("pipe") - .requireAlias(true) - .index("some_idx") - .id("some_id") - .document("Some doc") + .pipeline("pipe") + .requireAlias(true) + .index("some_idx") + .id("some_id") + .document("Some doc") )); String createStr = JsonpUtils.toJsonString(create, mapper); assertEquals(json, createStr); - BulkOperation create1 = IngesterOperation.of(create, mapper).operation(); + BulkOperation create1 = IngesterOperation.of(new RetryableBulkOperation<>(create, null, null), mapper) + .repeatableOperation().operation(); String create1Str = JsonpUtils.toJsonString(create1, mapper); assertEquals(json, create1Str); @@ -494,8 +495,8 @@ public void endToEndTest() throws Exception { assertEquals( 42, client.get(b -> b - .index(index) - .id(id), + .index(index) + .id(id), RequestTest.AppData.class ).source().getIntValue() );