diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BackoffPolicy.java b/core/src/main/java/org/elasticsearch/action/bulk/BackoffPolicy.java
new file mode 100644
index 00000000000..a0ccca0fb5c
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/bulk/BackoffPolicy.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.bulk;
+
+import org.elasticsearch.common.unit.TimeValue;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to resource constraints (i.e. the client's internal
+ * thread pool is full), the backoff policy decides how long the bulk processor will wait before the operation is retried internally.
+ *
+ * Notes for implementing custom subclasses:
+ *
+ * The underlying mathematical principle of BackoffPolicy are progressions which can be either finite or infinite although
+ * the latter should not be used for retrying. A progression can be mapped to a java.util.Iterator with the following
+ * semantics:
+ *
+ *
+ *
#hasNext() determines whether the progression has more elements. Return true for infinite progressions
+ *
#next() determines the next element in the progression, i.e. the next wait time period
+ *
+ *
+ * Note that backoff policies are exposed as Iterables in order to be consumed multiple times.
+ */
+public abstract class BackoffPolicy implements Iterable {
+ private static final BackoffPolicy NO_BACKOFF = new NoBackoff();
+
+ /**
+ * Creates a backoff policy that will not allow any backoff, i.e. an operation will fail after the first attempt.
+ *
+ * @return A backoff policy without any backoff period. The returned instance is thread safe.
+ */
+ public static BackoffPolicy noBackoff() {
+ return NO_BACKOFF;
+ }
+
+ /**
+ * Creates an new constant backoff policy with the provided configuration.
+ *
+ * @param delay The delay defines how long to wait between retry attempts. Must not be null.
+ * Must be <= Integer.MAX_VALUE ms.
+ * @param maxNumberOfRetries The maximum number of retries. Must be a non-negative number.
+ * @return A backoff policy with a constant wait time between retries. The returned instance is thread safe but each
+ * iterator created from it should only be used by a single thread.
+ */
+ public static BackoffPolicy constantBackoff(TimeValue delay, int maxNumberOfRetries) {
+ return new ConstantBackoff(checkDelay(delay), maxNumberOfRetries);
+ }
+
+ /**
+ * Creates an new exponential backoff policy with a default configuration of 50 ms initial wait period and 8 retries taking
+ * roughly 5.1 seconds in total.
+ *
+ * @return A backoff policy with an exponential increase in wait time for retries. The returned instance is thread safe but each
+ * iterator created from it should only be used by a single thread.
+ */
+ public static BackoffPolicy exponentialBackoff() {
+ return exponentialBackoff(TimeValue.timeValueMillis(50), 8);
+ }
+
+ /**
+ * Creates an new exponential backoff policy with the provided configuration.
+ *
+ * @param initialDelay The initial delay defines how long to wait for the first retry attempt. Must not be null.
+ * Must be <= Integer.MAX_VALUE ms.
+ * @param maxNumberOfRetries The maximum number of retries. Must be a non-negative number.
+ * @return A backoff policy with an exponential increase in wait time for retries. The returned instance is thread safe but each
+ * iterator created from it should only be used by a single thread.
+ */
+ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNumberOfRetries) {
+ return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
+ }
+
+ private static TimeValue checkDelay(TimeValue delay) {
+ if (delay.millis() > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("delay must be <= " + Integer.MAX_VALUE + " ms");
+ }
+ return delay;
+ }
+
+ private static class NoBackoff extends BackoffPolicy {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public TimeValue next() {
+ throw new NoSuchElementException("No backoff");
+ }
+ };
+ }
+ }
+
+ private static class ExponentialBackoff extends BackoffPolicy {
+ private final int start;
+
+ private final int numberOfElements;
+
+ private ExponentialBackoff(int start, int numberOfElements) {
+ assert start >= 0;
+ assert numberOfElements >= 0;
+ this.start = start;
+ this.numberOfElements = numberOfElements;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return new ExponentialBackoffIterator(start, numberOfElements);
+ }
+ }
+
+ private static class ExponentialBackoffIterator implements Iterator {
+ private final int numberOfElements;
+
+ private final int start;
+
+ private int currentlyConsumed;
+
+ private ExponentialBackoffIterator(int start, int numberOfElements) {
+ this.start = start;
+ this.numberOfElements = numberOfElements;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentlyConsumed < numberOfElements;
+ }
+
+ @Override
+ public TimeValue next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("Only up to " + numberOfElements + " elements");
+ }
+ int result = start + 10 * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1);
+ currentlyConsumed++;
+ return TimeValue.timeValueMillis(result);
+ }
+ }
+
+ private static final class ConstantBackoff extends BackoffPolicy {
+ private final TimeValue delay;
+
+ private final int numberOfElements;
+
+ public ConstantBackoff(TimeValue delay, int numberOfElements) {
+ assert numberOfElements >= 0;
+ this.delay = delay;
+ this.numberOfElements = numberOfElements;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return new ConstantBackoffIterator(delay, numberOfElements);
+ }
+ }
+
+ private static final class ConstantBackoffIterator implements Iterator {
+ private final TimeValue delay;
+ private final int numberOfElements;
+ private int curr;
+
+ public ConstantBackoffIterator(TimeValue delay, int numberOfElements) {
+ this.delay = delay;
+ this.numberOfElements = numberOfElements;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return curr < numberOfElements;
+ }
+
+ @Override
+ public TimeValue next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ curr++;
+ return delay;
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
index 2a7c185ad8a..316ec7a548e 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
@@ -19,7 +19,6 @@
package org.elasticsearch.action.bulk;
-import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
@@ -48,7 +47,7 @@ public class BulkProcessor implements Closeable {
/**
* A listener for the execution.
*/
- public static interface Listener {
+ public interface Listener {
/**
* Callback before the bulk is executed.
@@ -79,6 +78,7 @@ public class BulkProcessor implements Closeable {
private int bulkActions = 1000;
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
private TimeValue flushInterval = null;
+ private BackoffPolicy backoffPolicy = BackoffPolicy.noBackoff();
/**
* Creates a builder of bulk processor with the client to use and the listener that will be used
@@ -136,11 +136,25 @@ public class BulkProcessor implements Closeable {
return this;
}
+ /**
+ * Sets a custom backoff policy. The backoff policy defines how the bulk processor should handle retries of bulk requests internally
+ * in case they have failed due to resource constraints (i.e. a thread pool was full).
+ *
+ * The default is to not back off, i.e. failing immediately.
+ */
+ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) {
+ if (backoffPolicy == null) {
+ throw new NullPointerException("'backoffPolicy' must not be null. To disable backoff, pass BackoffPolicy.noBackoff()");
+ }
+ this.backoffPolicy = backoffPolicy;
+ return this;
+ }
+
/**
* Builds a new bulk processor.
*/
public BulkProcessor build() {
- return new BulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
+ return new BulkProcessor(client, backoffPolicy, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
}
}
@@ -152,38 +166,27 @@ public class BulkProcessor implements Closeable {
return new Builder(client, listener);
}
- private final Client client;
- private final Listener listener;
-
- private final String name;
-
- private final int concurrentRequests;
private final int bulkActions;
private final long bulkSize;
- private final TimeValue flushInterval;
- private final Semaphore semaphore;
+
private final ScheduledThreadPoolExecutor scheduler;
private final ScheduledFuture scheduledFuture;
private final AtomicLong executionIdGen = new AtomicLong();
private BulkRequest bulkRequest;
+ private final BulkRequestHandler bulkRequestHandler;
private volatile boolean closed = false;
- BulkProcessor(Client client, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
- this.client = client;
- this.listener = listener;
- this.name = name;
- this.concurrentRequests = concurrentRequests;
+ BulkProcessor(Client client, BackoffPolicy backoffPolicy, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.bytes();
- this.semaphore = new Semaphore(concurrentRequests);
this.bulkRequest = new BulkRequest();
+ this.bulkRequestHandler = (concurrentRequests == 0) ? BulkRequestHandler.syncHandler(client, backoffPolicy, listener) : BulkRequestHandler.asyncHandler(client, backoffPolicy, listener, concurrentRequests);
- this.flushInterval = flushInterval;
if (flushInterval != null) {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@@ -231,14 +234,7 @@ public class BulkProcessor implements Closeable {
if (bulkRequest.numberOfActions() > 0) {
execute();
}
- if (this.concurrentRequests < 1) {
- return true;
- }
- if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
- semaphore.release(this.concurrentRequests);
- return true;
- }
- return false;
+ return this.bulkRequestHandler.awaitClose(timeout, unit);
}
/**
@@ -308,58 +304,7 @@ public class BulkProcessor implements Closeable {
final long executionId = executionIdGen.incrementAndGet();
this.bulkRequest = new BulkRequest();
-
- if (concurrentRequests == 0) {
- // execute in a blocking fashion...
- boolean afterCalled = false;
- try {
- listener.beforeBulk(executionId, bulkRequest);
- BulkResponse bulkItemResponses = client.bulk(bulkRequest).actionGet();
- afterCalled = true;
- listener.afterBulk(executionId, bulkRequest, bulkItemResponses);
- } catch (Exception e) {
- if (!afterCalled) {
- listener.afterBulk(executionId, bulkRequest, e);
- }
- }
- } else {
- boolean success = false;
- boolean acquired = false;
- try {
- listener.beforeBulk(executionId, bulkRequest);
- semaphore.acquire();
- acquired = true;
- client.bulk(bulkRequest, new ActionListener() {
- @Override
- public void onResponse(BulkResponse response) {
- try {
- listener.afterBulk(executionId, bulkRequest, response);
- } finally {
- semaphore.release();
- }
- }
-
- @Override
- public void onFailure(Throwable e) {
- try {
- listener.afterBulk(executionId, bulkRequest, e);
- } finally {
- semaphore.release();
- }
- }
- });
- success = true;
- } catch (InterruptedException e) {
- Thread.interrupted();
- listener.afterBulk(executionId, bulkRequest, e);
- } catch (Throwable t) {
- listener.afterBulk(executionId, bulkRequest, t);
- } finally {
- if (!success && acquired) { // if we fail on client.bulk() release the semaphore
- semaphore.release();
- }
- }
- }
+ this.bulkRequestHandler.execute(bulkRequest, executionId);
}
private boolean isOverTheLimit() {
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java
new file mode 100644
index 00000000000..ffc985bd510
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.bulk;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Abstracts the low-level details of bulk request handling
+ */
+abstract class BulkRequestHandler {
+ protected final ESLogger logger;
+ protected final Client client;
+
+ protected BulkRequestHandler(Client client) {
+ this.client = client;
+ this.logger = Loggers.getLogger(getClass(), client.settings());
+ }
+
+
+ public abstract void execute(BulkRequest bulkRequest, long executionId);
+
+ public abstract boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
+
+
+ public static BulkRequestHandler syncHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener) {
+ return new SyncBulkRequestHandler(client, backoffPolicy, listener);
+ }
+
+ public static BulkRequestHandler asyncHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, int concurrentRequests) {
+ return new AsyncBulkRequestHandler(client, backoffPolicy, listener, concurrentRequests);
+ }
+
+ private static class SyncBulkRequestHandler extends BulkRequestHandler {
+ private final BulkProcessor.Listener listener;
+ private final BackoffPolicy backoffPolicy;
+
+ public SyncBulkRequestHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener) {
+ super(client);
+ this.backoffPolicy = backoffPolicy;
+ this.listener = listener;
+ }
+
+ @Override
+ public void execute(BulkRequest bulkRequest, long executionId) {
+ boolean afterCalled = false;
+ try {
+ listener.beforeBulk(executionId, bulkRequest);
+ BulkResponse bulkResponse = Retry
+ .on(EsRejectedExecutionException.class)
+ .policy(backoffPolicy)
+ .withSyncBackoff(client, bulkRequest);
+ afterCalled = true;
+ listener.afterBulk(executionId, bulkRequest, bulkResponse);
+ } catch (Exception e) {
+ if (!afterCalled) {
+ logger.warn("Failed to executed bulk request {}.", e, executionId);
+ listener.afterBulk(executionId, bulkRequest, e);
+ }
+ }
+ }
+
+ @Override
+ public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
+ // we are "closed" immediately as there is no request in flight
+ return true;
+ }
+ }
+
+ private static class AsyncBulkRequestHandler extends BulkRequestHandler {
+ private final BackoffPolicy backoffPolicy;
+ private final BulkProcessor.Listener listener;
+ private final Semaphore semaphore;
+ private final int concurrentRequests;
+
+ private AsyncBulkRequestHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, int concurrentRequests) {
+ super(client);
+ this.backoffPolicy = backoffPolicy;
+ assert concurrentRequests > 0;
+ this.listener = listener;
+ this.concurrentRequests = concurrentRequests;
+ this.semaphore = new Semaphore(concurrentRequests);
+ }
+
+ @Override
+ public void execute(BulkRequest bulkRequest, long executionId) {
+ boolean bulkRequestSetupSuccessful = false;
+ boolean acquired = false;
+ try {
+ listener.beforeBulk(executionId, bulkRequest);
+ semaphore.acquire();
+ acquired = true;
+ Retry.on(EsRejectedExecutionException.class)
+ .policy(backoffPolicy)
+ .withAsyncBackoff(client, bulkRequest, new ActionListener() {
+ @Override
+ public void onResponse(BulkResponse response) {
+ try {
+ listener.afterBulk(executionId, bulkRequest, response);
+ } finally {
+ semaphore.release();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ try {
+ listener.afterBulk(executionId, bulkRequest, e);
+ } finally {
+ semaphore.release();
+ }
+ }
+ });
+ bulkRequestSetupSuccessful = true;
+ } catch (InterruptedException e) {
+ // This is intentionally wrong to avoid changing the behaviour implicitly with this PR. It will be fixed in #14833
+ Thread.interrupted();
+ listener.afterBulk(executionId, bulkRequest, e);
+ } catch (Throwable t) {
+ logger.warn("Failed to executed bulk request {}.", t, executionId);
+ listener.afterBulk(executionId, bulkRequest, t);
+ } finally {
+ if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
+ semaphore.release();
+ }
+ }
+ }
+
+ @Override
+ public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
+ if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
+ semaphore.release(this.concurrentRequests);
+ return true;
+ }
+ return false;
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/Retry.java b/core/src/main/java/org/elasticsearch/action/bulk/Retry.java
new file mode 100644
index 00000000000..477e61045ba
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/bulk/Retry.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.bulk;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.FutureUtils;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.function.Predicate;
+
+/**
+ * Encapsulates synchronous and asynchronous retry logic.
+ */
+class Retry {
+ private final Class extends Throwable> retryOnThrowable;
+
+ private BackoffPolicy backoffPolicy;
+
+ public static Retry on(Class extends Throwable> retryOnThrowable) {
+ return new Retry(retryOnThrowable);
+ }
+
+ /**
+ * @param backoffPolicy The backoff policy that defines how long and how often to wait for retries.
+ */
+ public Retry policy(BackoffPolicy backoffPolicy) {
+ this.backoffPolicy = backoffPolicy;
+ return this;
+ }
+
+ Retry(Class extends Throwable> retryOnThrowable) {
+ this.retryOnThrowable = retryOnThrowable;
+ }
+
+ /**
+ * Invokes #bulk(BulkRequest, ActionListener) on the provided client. Backs off on the provided exception and delegates results to the
+ * provided listener.
+ *
+ * @param client Client invoking the bulk request.
+ * @param bulkRequest The bulk request that should be executed.
+ * @param listener A listener that is invoked when the bulk request finishes or completes with an exception. The listener is not
+ */
+ public void withAsyncBackoff(Client client, BulkRequest bulkRequest, ActionListener listener) {
+ AsyncRetryHandler r = new AsyncRetryHandler(retryOnThrowable, backoffPolicy, client, listener);
+ r.execute(bulkRequest);
+
+ }
+
+ /**
+ * Invokes #bulk(BulkRequest) on the provided client. Backs off on the provided exception.
+ *
+ * @param client Client invoking the bulk request.
+ * @param bulkRequest The bulk request that should be executed.
+ * @return the bulk response as returned by the client.
+ * @throws Exception Any exception thrown by the callable.
+ */
+ public BulkResponse withSyncBackoff(Client client, BulkRequest bulkRequest) throws Exception {
+ return SyncRetryHandler
+ .create(retryOnThrowable, backoffPolicy, client)
+ .executeBlocking(bulkRequest)
+ .actionGet();
+ }
+
+ static class AbstractRetryHandler implements ActionListener {
+ private final ESLogger logger;
+ private final Client client;
+ private final ActionListener listener;
+ private final Iterator backoff;
+ private final Class extends Throwable> retryOnThrowable;
+ // Access only when holding a client-side lock, see also #addResponses()
+ private final List responses = new ArrayList<>();
+ private final long startTimestampNanos;
+ // needed to construct the next bulk request based on the response to the previous one
+ // volatile as we're called from a scheduled thread
+ private volatile BulkRequest currentBulkRequest;
+ private volatile ScheduledFuture> scheduledRequestFuture;
+
+ public AbstractRetryHandler(Class extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener listener) {
+ this.retryOnThrowable = retryOnThrowable;
+ this.backoff = backoffPolicy.iterator();
+ this.client = client;
+ this.listener = listener;
+ this.logger = Loggers.getLogger(getClass(), client.settings());
+ // in contrast to System.currentTimeMillis(), nanoTime() uses a monotonic clock under the hood
+ this.startTimestampNanos = System.nanoTime();
+ }
+
+ @Override
+ public void onResponse(BulkResponse bulkItemResponses) {
+ if (!bulkItemResponses.hasFailures()) {
+ // we're done here, include all responses
+ addResponses(bulkItemResponses, (r -> true));
+ finishHim();
+ } else {
+ if (canRetry(bulkItemResponses)) {
+ addResponses(bulkItemResponses, (r -> !r.isFailed()));
+ retry(createBulkRequestForRetry(bulkItemResponses));
+ } else {
+ addResponses(bulkItemResponses, (r -> true));
+ finishHim();
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ try {
+ listener.onFailure(e);
+ } finally {
+ FutureUtils.cancel(scheduledRequestFuture);
+ }
+ }
+
+ private void retry(BulkRequest bulkRequestForRetry) {
+ assert backoff.hasNext();
+ TimeValue next = backoff.next();
+ logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
+ scheduledRequestFuture = client.threadPool().schedule(next, ThreadPool.Names.SAME, (() -> this.execute(bulkRequestForRetry)));
+ }
+
+ private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
+ BulkRequest requestToReissue = new BulkRequest();
+ int index = 0;
+ for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) {
+ if (bulkItemResponse.isFailed()) {
+ requestToReissue.add(currentBulkRequest.requests().get(index));
+ }
+ index++;
+ }
+ return requestToReissue;
+ }
+
+ private boolean canRetry(BulkResponse bulkItemResponses) {
+ if (!backoff.hasNext()) {
+ return false;
+ }
+ for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
+ if (bulkItemResponse.isFailed()) {
+ Throwable cause = bulkItemResponse.getFailure().getCause();
+ Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
+ if (!rootCause.getClass().equals(retryOnThrowable)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private void finishHim() {
+ try {
+ listener.onResponse(getAccumulatedResponse());
+ } finally {
+ FutureUtils.cancel(scheduledRequestFuture);
+ }
+ }
+
+ private void addResponses(BulkResponse response, Predicate filter) {
+ for (BulkItemResponse bulkItemResponse : response) {
+ if (filter.test(bulkItemResponse)) {
+ // Use client-side lock here to avoid visibility issues. This method may be called multiple times
+ // (based on how many retries we have to issue) and relying that the response handling code will be
+ // scheduled on the same thread is fragile.
+ synchronized (responses) {
+ responses.add(bulkItemResponse);
+ }
+ }
+ }
+ }
+
+ private BulkResponse getAccumulatedResponse() {
+ BulkItemResponse[] itemResponses;
+ synchronized (responses) {
+ itemResponses = responses.toArray(new BulkItemResponse[1]);
+ }
+ long stopTimestamp = System.nanoTime();
+ long totalLatencyMs = TimeValue.timeValueNanos(stopTimestamp - startTimestampNanos).millis();
+ return new BulkResponse(itemResponses, totalLatencyMs);
+ }
+
+ public void execute(BulkRequest bulkRequest) {
+ this.currentBulkRequest = bulkRequest;
+ client.bulk(bulkRequest, this);
+ }
+ }
+
+ static class AsyncRetryHandler extends AbstractRetryHandler {
+ public AsyncRetryHandler(Class extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener listener) {
+ super(retryOnThrowable, backoffPolicy, client, listener);
+ }
+ }
+
+ static class SyncRetryHandler extends AbstractRetryHandler {
+ private final PlainActionFuture actionFuture;
+
+ public static SyncRetryHandler create(Class extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client) {
+ PlainActionFuture actionFuture = PlainActionFuture.newFuture();
+ return new SyncRetryHandler(retryOnThrowable, backoffPolicy, client, actionFuture);
+ }
+
+ public SyncRetryHandler(Class extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, PlainActionFuture actionFuture) {
+ super(retryOnThrowable, backoffPolicy, client, actionFuture);
+ this.actionFuture = actionFuture;
+ }
+
+ public ActionFuture executeBlocking(BulkRequest bulkRequest) {
+ super.execute(bulkRequest);
+ return actionFuture;
+ }
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java
new file mode 100644
index 00000000000..3c38e2ef0fa
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.bulk;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.hamcrest.Matcher;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
+public class BulkProcessorRetryIT extends ESIntegTestCase {
+ private static final String INDEX_NAME = "test";
+ private static final String TYPE_NAME = "type";
+
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal) {
+ //Have very low pool and queue sizes to overwhelm internal pools easily
+ return Settings.builder()
+ .put(super.nodeSettings(nodeOrdinal))
+ .put("threadpool.generic.size", 1)
+ .put("threadpool.generic.queue_size", 1)
+ // don't mess with this one! It's quite sensitive to a low queue size
+ // (see also ThreadedActionListener which is happily spawning threads even when we already got rejected)
+ //.put("threadpool.listener.queue_size", 1)
+ .put("threadpool.get.queue_size", 1)
+ // default is 50
+ .put("threadpool.bulk.queue_size", 20)
+ .build();
+ }
+
+
+ public void testBulkRejectionLoadWithoutBackoff() throws Throwable {
+ boolean rejectedExecutionExpected = true;
+ executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
+ }
+
+ public void testBulkRejectionLoadWithBackoff() throws Throwable {
+ boolean rejectedExecutionExpected = false;
+ executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);
+ }
+
+ private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Throwable {
+ int numberOfAsyncOps = randomIntBetween(600, 700);
+ final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps);
+ final Set