mirror of
synced 2025-03-24 17:09:48 +00:00
BulkProcessor retries after request handling has been rejected due to a full thread pool
With this commit we introduce limited retries with a backoff logic to BulkProcessor when a bulk request has been rejeced with an EsRejectedExecutionException. Fixes #14620.
This commit is contained in:
@ -0,0 +1,203 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.action.bulk;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Iterator;
import java.util.NoSuchElementException;
* Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to resource constraints (i.e. the client's internal
* thread pool is full), the backoff policy decides how long the bulk processor will wait before the operation is retried internally.
* Notes for implementing custom subclasses:
* The underlying mathematical principle of <code>BackoffPolicy</code> are progressions which can be either finite or infinite although
* the latter should not be used for retrying. A progression can be mapped to a <code>java.util.Iterator</code> with the following
* semantics:
* <ul>
* <li><code>#hasNext()</code> determines whether the progression has more elements. Return <code>true</code> for infinite progressions</li>
* <li><code>#next()</code> determines the next element in the progression, i.e. the next wait time period</li>
* </ul>
* Note that backoff policies are exposed as <code>Iterables</code> in order to be consumed multiple times.
public abstract class BackoffPolicy implements Iterable<TimeValue> {
private static final BackoffPolicy NO_BACKOFF = new NoBackoff();
* Creates a backoff policy that will not allow any backoff, i.e. an operation will fail after the first attempt.
* @return A backoff policy without any backoff period. The returned instance is thread safe.
public static BackoffPolicy noBackoff() {
return NO_BACKOFF;
* Creates an new constant backoff policy with the provided configuration.
* @param delay The delay defines how long to wait between retry attempts. Must not be null.
* Must be <= <code>Integer.MAX_VALUE</code> ms.
* @param maxNumberOfRetries The maximum number of retries. Must be a non-negative number.
* @return A backoff policy with a constant wait time between retries. The returned instance is thread safe but each
* iterator created from it should only be used by a single thread.
public static BackoffPolicy constantBackoff(TimeValue delay, int maxNumberOfRetries) {
return new ConstantBackoff(checkDelay(delay), maxNumberOfRetries);
* Creates an new exponential backoff policy with a default configuration of 50 ms initial wait period and 8 retries taking
* roughly 5.1 seconds in total.
* @return A backoff policy with an exponential increase in wait time for retries. The returned instance is thread safe but each
* iterator created from it should only be used by a single thread.
public static BackoffPolicy exponentialBackoff() {
return exponentialBackoff(TimeValue.timeValueMillis(50), 8);
* Creates an new exponential backoff policy with the provided configuration.
* @param initialDelay The initial delay defines how long to wait for the first retry attempt. Must not be null.
* Must be <= <code>Integer.MAX_VALUE</code> ms.
* @param maxNumberOfRetries The maximum number of retries. Must be a non-negative number.
* @return A backoff policy with an exponential increase in wait time for retries. The returned instance is thread safe but each
* iterator created from it should only be used by a single thread.
public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNumberOfRetries) {
return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
private static TimeValue checkDelay(TimeValue delay) {
if (delay.millis() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("delay must be <= " + Integer.MAX_VALUE + " ms");
return delay;
private static class NoBackoff extends BackoffPolicy {
public Iterator<TimeValue> iterator() {
return new Iterator<TimeValue>() {
public boolean hasNext() {
return false;
public TimeValue next() {
throw new NoSuchElementException("No backoff");
private static class ExponentialBackoff extends BackoffPolicy {
private final int start;
private final int numberOfElements;
private ExponentialBackoff(int start, int numberOfElements) {
assert start >= 0;
assert numberOfElements >= 0;
this.start = start;
this.numberOfElements = numberOfElements;
public Iterator<TimeValue> iterator() {
return new ExponentialBackoffIterator(start, numberOfElements);
private static class ExponentialBackoffIterator implements Iterator<TimeValue> {
private final int numberOfElements;
private final int start;
private int currentlyConsumed;
private ExponentialBackoffIterator(int start, int numberOfElements) {
this.start = start;
this.numberOfElements = numberOfElements;
public boolean hasNext() {
return currentlyConsumed < numberOfElements;
public TimeValue next() {
if (!hasNext()) {
throw new NoSuchElementException("Only up to " + numberOfElements + " elements");
int result = start + 10 * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1);
return TimeValue.timeValueMillis(result);
private static final class ConstantBackoff extends BackoffPolicy {
private final TimeValue delay;
private final int numberOfElements;
public ConstantBackoff(TimeValue delay, int numberOfElements) {
assert numberOfElements >= 0;
this.delay = delay;
this.numberOfElements = numberOfElements;
public Iterator<TimeValue> iterator() {
return new ConstantBackoffIterator(delay, numberOfElements);
private static final class ConstantBackoffIterator implements Iterator<TimeValue> {
private final TimeValue delay;
private final int numberOfElements;
private int curr;
public ConstantBackoffIterator(TimeValue delay, int numberOfElements) {
this.delay = delay;
this.numberOfElements = numberOfElements;
public boolean hasNext() {
return curr < numberOfElements;
public TimeValue next() {
if (!hasNext()) {
throw new NoSuchElementException();
return delay;
@ -19,7 +19,6 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
@ -48,7 +47,7 @@ public class BulkProcessor implements Closeable {
* A listener for the execution.
public static interface Listener {
public interface Listener {
* Callback before the bulk is executed.
@ -79,6 +78,7 @@ public class BulkProcessor implements Closeable {
private int bulkActions = 1000;
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
private TimeValue flushInterval = null;
private BackoffPolicy backoffPolicy = BackoffPolicy.noBackoff();
* Creates a builder of bulk processor with the client to use and the listener that will be used
@ -136,11 +136,25 @@ public class BulkProcessor implements Closeable {
return this;
* Sets a custom backoff policy. The backoff policy defines how the bulk processor should handle retries of bulk requests internally
* in case they have failed due to resource constraints (i.e. a thread pool was full).
* The default is to not back off, i.e. failing immediately.
public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) {
if (backoffPolicy == null) {
throw new NullPointerException("'backoffPolicy' must not be null. To disable backoff, pass BackoffPolicy.noBackoff()");
this.backoffPolicy = backoffPolicy;
return this;
* Builds a new bulk processor.
public BulkProcessor build() {
return new BulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
return new BulkProcessor(client, backoffPolicy, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
@ -152,38 +166,27 @@ public class BulkProcessor implements Closeable {
return new Builder(client, listener);
private final Client client;
private final Listener listener;
private final String name;
private final int concurrentRequests;
private final int bulkActions;
private final long bulkSize;
private final TimeValue flushInterval;
private final Semaphore semaphore;
private final ScheduledThreadPoolExecutor scheduler;
private final ScheduledFuture scheduledFuture;
private final AtomicLong executionIdGen = new AtomicLong();
private BulkRequest bulkRequest;
private final BulkRequestHandler bulkRequestHandler;
private volatile boolean closed = false;
BulkProcessor(Client client, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
this.client = client;
this.listener = listener;
this.name = name;
this.concurrentRequests = concurrentRequests;
BulkProcessor(Client client, BackoffPolicy backoffPolicy, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.bytes();
this.semaphore = new Semaphore(concurrentRequests);
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = (concurrentRequests == 0) ? BulkRequestHandler.syncHandler(client, backoffPolicy, listener) : BulkRequestHandler.asyncHandler(client, backoffPolicy, listener, concurrentRequests);
this.flushInterval = flushInterval;
if (flushInterval != null) {
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
@ -231,14 +234,7 @@ public class BulkProcessor implements Closeable {
if (bulkRequest.numberOfActions() > 0) {
if (this.concurrentRequests < 1) {
return true;
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
return true;
return false;
return this.bulkRequestHandler.awaitClose(timeout, unit);
@ -308,58 +304,7 @@ public class BulkProcessor implements Closeable {
final long executionId = executionIdGen.incrementAndGet();
this.bulkRequest = new BulkRequest();
if (concurrentRequests == 0) {
// execute in a blocking fashion...
boolean afterCalled = false;
try {
listener.beforeBulk(executionId, bulkRequest);
BulkResponse bulkItemResponses = client.bulk(bulkRequest).actionGet();
afterCalled = true;
listener.afterBulk(executionId, bulkRequest, bulkItemResponses);
} catch (Exception e) {
if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, e);
} else {
boolean success = false;
boolean acquired = false;
try {
listener.beforeBulk(executionId, bulkRequest);
acquired = true;
client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
public void onFailure(Throwable e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
success = true;
} catch (InterruptedException e) {
listener.afterBulk(executionId, bulkRequest, e);
} catch (Throwable t) {
listener.afterBulk(executionId, bulkRequest, t);
} finally {
if (!success && acquired) { // if we fail on client.bulk() release the semaphore
this.bulkRequestHandler.execute(bulkRequest, executionId);
private boolean isOverTheLimit() {
@ -0,0 +1,160 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
* Abstracts the low-level details of bulk request handling
abstract class BulkRequestHandler {
protected final ESLogger logger;
protected final Client client;
protected BulkRequestHandler(Client client) {
this.client = client;
this.logger = Loggers.getLogger(getClass(), client.settings());
public abstract void execute(BulkRequest bulkRequest, long executionId);
public abstract boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
public static BulkRequestHandler syncHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener) {
return new SyncBulkRequestHandler(client, backoffPolicy, listener);
public static BulkRequestHandler asyncHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, int concurrentRequests) {
return new AsyncBulkRequestHandler(client, backoffPolicy, listener, concurrentRequests);
private static class SyncBulkRequestHandler extends BulkRequestHandler {
private final BulkProcessor.Listener listener;
private final BackoffPolicy backoffPolicy;
public SyncBulkRequestHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener) {
this.backoffPolicy = backoffPolicy;
this.listener = listener;
public void execute(BulkRequest bulkRequest, long executionId) {
boolean afterCalled = false;
try {
listener.beforeBulk(executionId, bulkRequest);
BulkResponse bulkResponse = Retry
.withSyncBackoff(client, bulkRequest);
afterCalled = true;
listener.afterBulk(executionId, bulkRequest, bulkResponse);
} catch (Exception e) {
if (!afterCalled) {
logger.warn("Failed to executed bulk request {}.", e, executionId);
listener.afterBulk(executionId, bulkRequest, e);
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
// we are "closed" immediately as there is no request in flight
return true;
private static class AsyncBulkRequestHandler extends BulkRequestHandler {
private final BackoffPolicy backoffPolicy;
private final BulkProcessor.Listener listener;
private final Semaphore semaphore;
private final int concurrentRequests;
private AsyncBulkRequestHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, int concurrentRequests) {
this.backoffPolicy = backoffPolicy;
assert concurrentRequests > 0;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
this.semaphore = new Semaphore(concurrentRequests);
public void execute(BulkRequest bulkRequest, long executionId) {
boolean bulkRequestSetupSuccessful = false;
boolean acquired = false;
try {
listener.beforeBulk(executionId, bulkRequest);
acquired = true;
.withAsyncBackoff(client, bulkRequest, new ActionListener<BulkResponse>() {
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
public void onFailure(Throwable e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
bulkRequestSetupSuccessful = true;
} catch (InterruptedException e) {
// This is intentionally wrong to avoid changing the behaviour implicitly with this PR. It will be fixed in #14833
listener.afterBulk(executionId, bulkRequest, e);
} catch (Throwable t) {
logger.warn("Failed to executed bulk request {}.", t, executionId);
listener.afterBulk(executionId, bulkRequest, t);
} finally {
if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
return true;
return false;
Normal file
Normal file
@ -0,0 +1,237 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.action.bulk;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Predicate;
* Encapsulates synchronous and asynchronous retry logic.
class Retry {
private final Class<? extends Throwable> retryOnThrowable;
private BackoffPolicy backoffPolicy;
public static Retry on(Class<? extends Throwable> retryOnThrowable) {
return new Retry(retryOnThrowable);
* @param backoffPolicy The backoff policy that defines how long and how often to wait for retries.
public Retry policy(BackoffPolicy backoffPolicy) {
this.backoffPolicy = backoffPolicy;
return this;
Retry(Class<? extends Throwable> retryOnThrowable) {
this.retryOnThrowable = retryOnThrowable;
* Invokes #bulk(BulkRequest, ActionListener) on the provided client. Backs off on the provided exception and delegates results to the
* provided listener.
* @param client Client invoking the bulk request.
* @param bulkRequest The bulk request that should be executed.
* @param listener A listener that is invoked when the bulk request finishes or completes with an exception. The listener is not
public void withAsyncBackoff(Client client, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
AsyncRetryHandler r = new AsyncRetryHandler(retryOnThrowable, backoffPolicy, client, listener);
* Invokes #bulk(BulkRequest) on the provided client. Backs off on the provided exception.
* @param client Client invoking the bulk request.
* @param bulkRequest The bulk request that should be executed.
* @return the bulk response as returned by the client.
* @throws Exception Any exception thrown by the callable.
public BulkResponse withSyncBackoff(Client client, BulkRequest bulkRequest) throws Exception {
return SyncRetryHandler
.create(retryOnThrowable, backoffPolicy, client)
static class AbstractRetryHandler implements ActionListener<BulkResponse> {
private final ESLogger logger;
private final Client client;
private final ActionListener<BulkResponse> listener;
private final Iterator<TimeValue> backoff;
private final Class<? extends Throwable> retryOnThrowable;
// Access only when holding a client-side lock, see also #addResponses()
private final List<BulkItemResponse> responses = new ArrayList<>();
private final long startTimestampNanos;
// needed to construct the next bulk request based on the response to the previous one
// volatile as we're called from a scheduled thread
private volatile BulkRequest currentBulkRequest;
private volatile ScheduledFuture<?> scheduledRequestFuture;
public AbstractRetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener<BulkResponse> listener) {
this.retryOnThrowable = retryOnThrowable;
this.backoff = backoffPolicy.iterator();
this.client = client;
this.listener = listener;
this.logger = Loggers.getLogger(getClass(), client.settings());
// in contrast to System.currentTimeMillis(), nanoTime() uses a monotonic clock under the hood
this.startTimestampNanos = System.nanoTime();
public void onResponse(BulkResponse bulkItemResponses) {
if (!bulkItemResponses.hasFailures()) {
// we're done here, include all responses
addResponses(bulkItemResponses, (r -> true));
} else {
if (canRetry(bulkItemResponses)) {
addResponses(bulkItemResponses, (r -> !r.isFailed()));
} else {
addResponses(bulkItemResponses, (r -> true));
public void onFailure(Throwable e) {
try {
} finally {
private void retry(BulkRequest bulkRequestForRetry) {
assert backoff.hasNext();
TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
scheduledRequestFuture = client.threadPool().schedule(next, ThreadPool.Names.SAME, (() -> this.execute(bulkRequestForRetry)));
private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
BulkRequest requestToReissue = new BulkRequest();
int index = 0;
for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) {
if (bulkItemResponse.isFailed()) {
return requestToReissue;
private boolean canRetry(BulkResponse bulkItemResponses) {
if (!backoff.hasNext()) {
return false;
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
if (bulkItemResponse.isFailed()) {
Throwable cause = bulkItemResponse.getFailure().getCause();
Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
if (!rootCause.getClass().equals(retryOnThrowable)) {
return false;
return true;
private void finishHim() {
try {
} finally {
private void addResponses(BulkResponse response, Predicate<BulkItemResponse> filter) {
for (BulkItemResponse bulkItemResponse : response) {
if (filter.test(bulkItemResponse)) {
// Use client-side lock here to avoid visibility issues. This method may be called multiple times
// (based on how many retries we have to issue) and relying that the response handling code will be
// scheduled on the same thread is fragile.
synchronized (responses) {
private BulkResponse getAccumulatedResponse() {
BulkItemResponse[] itemResponses;
synchronized (responses) {
itemResponses = responses.toArray(new BulkItemResponse[1]);
long stopTimestamp = System.nanoTime();
long totalLatencyMs = TimeValue.timeValueNanos(stopTimestamp - startTimestampNanos).millis();
return new BulkResponse(itemResponses, totalLatencyMs);
public void execute(BulkRequest bulkRequest) {
this.currentBulkRequest = bulkRequest;
client.bulk(bulkRequest, this);
static class AsyncRetryHandler extends AbstractRetryHandler {
public AsyncRetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener<BulkResponse> listener) {
super(retryOnThrowable, backoffPolicy, client, listener);
static class SyncRetryHandler extends AbstractRetryHandler {
private final PlainActionFuture<BulkResponse> actionFuture;
public static SyncRetryHandler create(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client) {
PlainActionFuture<BulkResponse> actionFuture = PlainActionFuture.newFuture();
return new SyncRetryHandler(retryOnThrowable, backoffPolicy, client, actionFuture);
public SyncRetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, PlainActionFuture<BulkResponse> actionFuture) {
super(retryOnThrowable, backoffPolicy, client, actionFuture);
this.actionFuture = actionFuture;
public ActionFuture<BulkResponse> executeBlocking(BulkRequest bulkRequest) {
return actionFuture;
@ -0,0 +1,164 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.action.bulk;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matcher;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
public class BulkProcessorRetryIT extends ESIntegTestCase {
private static final String INDEX_NAME = "test";
private static final String TYPE_NAME = "type";
protected Settings nodeSettings(int nodeOrdinal) {
//Have very low pool and queue sizes to overwhelm internal pools easily
return Settings.builder()
.put("threadpool.generic.size", 1)
.put("threadpool.generic.queue_size", 1)
// don't mess with this one! It's quite sensitive to a low queue size
// (see also ThreadedActionListener which is happily spawning threads even when we already got rejected)
//.put("threadpool.listener.queue_size", 1)
.put("threadpool.get.queue_size", 1)
// default is 50
.put("threadpool.bulk.queue_size", 20)
public void testBulkRejectionLoadWithoutBackoff() throws Throwable {
boolean rejectedExecutionExpected = true;
executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
public void testBulkRejectionLoadWithBackoff() throws Throwable {
boolean rejectedExecutionExpected = false;
executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);
private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Throwable {
int numberOfAsyncOps = randomIntBetween(600, 700);
final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps);
final Set<Object> responses = Collections.newSetFromMap(new ConcurrentHashMap<>());
BulkProcessor bulkProcessor = BulkProcessor.builder(client(), new BulkProcessor.Listener() {
public void beforeBulk(long executionId, BulkRequest request) {
// no op
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// zero means that we're in the sync case, more means that we're in the async case
.setConcurrentRequests(randomIntBetween(0, 100))
indexDocs(bulkProcessor, numberOfAsyncOps);
latch.await(10, TimeUnit.SECONDS);
assertThat(responses.size(), equalTo(numberOfAsyncOps));
// validate all responses
for (Object response : responses) {
if (response instanceof BulkResponse) {
BulkResponse bulkResponse = (BulkResponse) response;
for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause());
if (rootCause instanceof EsRejectedExecutionException) {
if (rejectedExecutionExpected == false) {
// we're not expecting that we overwhelmed it even once
throw new AssertionError("Unexpected failure reason", rootCause);
} else {
throw new AssertionError("Unexpected failure", rootCause);
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
throw new AssertionError("Unexpected failure", t);
client().admin().indices().refresh(new RefreshRequest()).get();
// validate we did not create any duplicates due to retries
Matcher<Long> searchResultCount;
if (rejectedExecutionExpected) {
// it is ok if we lost some index operations to rejected executions
searchResultCount = lessThanOrEqualTo((long) numberOfAsyncOps);
} else {
searchResultCount = equalTo((long) numberOfAsyncOps);
SearchResponse results = client()
assertThat(results.getHits().totalHits(), searchResultCount);
private static void indexDocs(BulkProcessor processor, int numDocs) {
for (int i = 1; i <= numDocs; i++) {
.setSource("field", randomRealisticUnicodeOfLengthBetween(1, 30))
Normal file
Normal file
@ -0,0 +1,201 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.rest.NoOpClient;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.CountDownLatch;
import static org.hamcrest.Matchers.*;
public class RetryTests extends ESTestCase {
// no need to wait fof a long time in tests
private static final TimeValue DELAY = TimeValue.timeValueMillis(1L);
private static final int CALLS_TO_FAIL = 5;
private MockBulkClient bulkClient;
public void setUp() throws Exception {
this.bulkClient = new MockBulkClient(getTestName(), CALLS_TO_FAIL);
public void tearDown() throws Exception {
private BulkRequest createBulkRequest() {
BulkRequest request = new BulkRequest();
request.add(new UpdateRequest("shop", "products", "1"));
request.add(new UpdateRequest("shop", "products", "2"));
request.add(new UpdateRequest("shop", "products", "3"));
request.add(new UpdateRequest("shop", "products", "4"));
request.add(new UpdateRequest("shop", "products", "5"));
return request;
public void testSyncRetryBacksOff() throws Exception {
BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL);
BulkRequest bulkRequest = createBulkRequest();
BulkResponse response = Retry
.withSyncBackoff(bulkClient, bulkRequest);
assertThat(response.getItems().length, equalTo(bulkRequest.numberOfActions()));
public void testSyncRetryFailsAfterBackoff() throws Exception {
BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL - 1);
BulkRequest bulkRequest = createBulkRequest();
BulkResponse response = Retry
.withSyncBackoff(bulkClient, bulkRequest);
assertThat(response.getItems().length, equalTo(bulkRequest.numberOfActions()));
public void testAsyncRetryBacksOff() throws Exception {
BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL);
AssertingListener listener = new AssertingListener();
BulkRequest bulkRequest = createBulkRequest();
.withAsyncBackoff(bulkClient, bulkRequest, listener);
public void testAsyncRetryFailsAfterBacksOff() throws Exception {
BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL - 1);
AssertingListener listener = new AssertingListener();
BulkRequest bulkRequest = createBulkRequest();
.withAsyncBackoff(bulkClient, bulkRequest, listener);
private static class AssertingListener implements ActionListener<BulkResponse> {
private final CountDownLatch latch;
private int countOnResponseCalled = 0;
private Throwable lastFailure;
private BulkResponse response;
private AssertingListener() {
latch = new CountDownLatch(1);
public void awaitCallbacksCalled() throws InterruptedException {
public void onResponse(BulkResponse bulkItemResponses) {
this.response = bulkItemResponses;
public void onFailure(Throwable e) {
this.lastFailure = e;
public void assertOnResponseCalled() {
assertThat(countOnResponseCalled, equalTo(1));
public void assertResponseWithNumberOfItems(int numItems) {
assertThat(response.getItems().length, equalTo(numItems));
public void assertResponseWithoutFailures() {
assertThat(response, notNullValue());
assertFalse("Response should not have failures", response.hasFailures());
public void assertResponseWithFailures() {
assertThat(response, notNullValue());
assertTrue("Response should have failures", response.hasFailures());
public void assertOnFailureNeverCalled() {
assertThat(lastFailure, nullValue());
private static class MockBulkClient extends NoOpClient {
private int numberOfCallsToFail;
private MockBulkClient(String testName, int numberOfCallsToFail) {
this.numberOfCallsToFail = numberOfCallsToFail;
public ActionFuture<BulkResponse> bulk(BulkRequest request) {
PlainActionFuture<BulkResponse> responseFuture = new PlainActionFuture<>();
bulk(request, responseFuture);
return responseFuture;
public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
// do everything synchronously, that's fine for a test
boolean shouldFail = numberOfCallsToFail > 0;
BulkItemResponse[] itemResponses = new BulkItemResponse[request.requests().size()];
// if we have to fail, we need to fail at least once "reliably", the rest can be random
int itemToFail = randomInt(request.requests().size() - 1);
for (int idx = 0; idx < request.requests().size(); idx++) {
if (shouldFail && (randomBoolean() || idx == itemToFail)) {
itemResponses[idx] = failedResponse();
} else {
itemResponses[idx] = successfulResponse();
listener.onResponse(new BulkResponse(itemResponses, 1000L));
private BulkItemResponse successfulResponse() {
return new BulkItemResponse(1, "update", new DeleteResponse());
private BulkItemResponse failedResponse() {
return new BulkItemResponse(1, "update", new BulkItemResponse.Failure("test", "test", "1", new EsRejectedExecutionException("pool full")));
Reference in New Issue
Block a user