Simplify BulkProcessor handling and retry logic (#24051)

This commit collapses the SyncBulkRequestHandler and
AsyncBulkRequestHandler into a single BulkRequestHandler. The new
handler executes a bulk request and awaits for the completion if the
BulkProcessor was configured with a concurrentRequests setting of 0.
Otherwise the execution happens asynchronously.

As part of this change the Retry class has been refactored.
withSyncBackoff and withAsyncBackoff have been replaced with two
versions of withBackoff. One method takes a listener that will be
called on completion. The other method returns a future that will been
complete on request completion.
This commit is contained in:
Tim Brooks 2017-04-13 14:48:52 -05:00 committed by GitHub
parent 99e0268e0a
commit ffaac5a08a
6 changed files with 97 additions and 203 deletions

View File

@ -184,12 +184,7 @@ public class BulkProcessor implements Closeable {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
if (concurrentRequests == 0) {
this.bulkRequestHandler = BulkRequestHandler.syncHandler(consumer, backoffPolicy, listener, threadPool);
} else {
this.bulkRequestHandler = BulkRequestHandler.asyncHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);
}
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);
// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, threadPool);

View File

@ -27,155 +27,86 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* Abstracts the low-level details of bulk request handling
* Implements the low-level details of bulk request handling
*/
abstract class BulkRequestHandler {
protected final Logger logger;
protected final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
protected final ThreadPool threadPool;
public final class BulkRequestHandler {
private final Logger logger;
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final BulkProcessor.Listener listener;
private final Semaphore semaphore;
private final Retry retry;
private final int concurrentRequests;
protected BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, ThreadPool threadPool) {
BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
BulkProcessor.Listener listener, ThreadPool threadPool,
int concurrentRequests) {
assert concurrentRequests >= 0;
this.logger = Loggers.getLogger(getClass());
this.consumer = consumer;
this.threadPool = threadPool;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, threadPool);
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
}
public abstract void execute(BulkRequest bulkRequest, long executionId);
public abstract boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
public static BulkRequestHandler syncHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
BackoffPolicy backoffPolicy, BulkProcessor.Listener listener,
ThreadPool threadPool) {
return new SyncBulkRequestHandler(consumer, backoffPolicy, listener, threadPool);
}
public static BulkRequestHandler asyncHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
BackoffPolicy backoffPolicy, BulkProcessor.Listener listener,
ThreadPool threadPool, int concurrentRequests) {
return new AsyncBulkRequestHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);
}
private static class SyncBulkRequestHandler extends BulkRequestHandler {
private final BulkProcessor.Listener listener;
private final BackoffPolicy backoffPolicy;
SyncBulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
BulkProcessor.Listener listener, ThreadPool threadPool) {
super(consumer, threadPool);
this.backoffPolicy = backoffPolicy;
this.listener = listener;
}
@Override
public void execute(BulkRequest bulkRequest, long executionId) {
boolean afterCalled = false;
try {
listener.beforeBulk(executionId, bulkRequest);
BulkResponse bulkResponse = Retry
.on(EsRejectedExecutionException.class)
.policy(backoffPolicy)
.using(threadPool)
.withSyncBackoff(consumer, bulkRequest, Settings.EMPTY);
afterCalled = true;
listener.afterBulk(executionId, bulkRequest, bulkResponse);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info((Supplier<?>) () -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, e);
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, e);
@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info((Supplier<?>) () -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore
toRelease.run();
}
}
}
@Override
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
// we are "closed" immediately as there is no request in flight
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
semaphore.release(this.concurrentRequests);
return true;
}
}
private static class AsyncBulkRequestHandler extends BulkRequestHandler {
private final BackoffPolicy backoffPolicy;
private final BulkProcessor.Listener listener;
private final Semaphore semaphore;
private final int concurrentRequests;
private AsyncBulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
BulkProcessor.Listener listener, ThreadPool threadPool,
int concurrentRequests) {
super(consumer, threadPool);
this.backoffPolicy = backoffPolicy;
assert concurrentRequests > 0;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
this.semaphore = new Semaphore(concurrentRequests);
}
@Override
public void execute(BulkRequest bulkRequest, long executionId) {
boolean bulkRequestSetupSuccessful = false;
boolean acquired = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
acquired = true;
Retry.on(EsRejectedExecutionException.class)
.policy(backoffPolicy)
.using(threadPool)
.withAsyncBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
}
}
@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
}
}
}, Settings.EMPTY);
bulkRequestSetupSuccessful = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info((Supplier<?>) () -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
semaphore.release();
}
}
}
@Override
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
semaphore.release(this.concurrentRequests);
return true;
}
return false;
}
return false;
}
}

View File

@ -25,20 +25,14 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Predicate;
/**
@ -46,62 +40,42 @@ import java.util.function.Predicate;
*/
public class Retry {
private final Class<? extends Throwable> retryOnThrowable;
private final BackoffPolicy backoffPolicy;
private final ThreadPool threadPool;
private BackoffPolicy backoffPolicy;
private ThreadPool threadPool;
public static Retry on(Class<? extends Throwable> retryOnThrowable) {
return new Retry(retryOnThrowable);
}
Retry(Class<? extends Throwable> retryOnThrowable) {
public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, ThreadPool threadPool) {
this.retryOnThrowable = retryOnThrowable;
}
/**
* @param backoffPolicy The backoff policy that defines how long and how often to wait for retries.
*/
public Retry policy(BackoffPolicy backoffPolicy) {
this.backoffPolicy = backoffPolicy;
return this;
}
/**
* @param threadPool The threadPool that will be used to schedule retries.
*/
public Retry using(ThreadPool threadPool) {
this.threadPool = threadPool;
return this;
}
/**
* Invokes #apply(BulkRequest, ActionListener). Backs off on the provided exception and delegates results to the
* provided listener. Retries will be attempted using the provided schedule function
* Invokes #accept(BulkRequest, ActionListener). Backs off on the provided exception and delegates results to the
* provided listener. Retries will be scheduled using the class's thread pool.
* @param consumer The consumer to which apply the request and listener
* @param bulkRequest The bulk request that should be executed.
* @param listener A listener that is invoked when the bulk request finishes or completes with an exception. The listener is not
* @param settings settings
*/
public void withAsyncBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Settings settings) {
public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Settings settings) {
RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, threadPool);
r.execute(bulkRequest);
}
/**
* Invokes #apply(BulkRequest, ActionListener). Backs off on the provided exception. Retries will be attempted using
* the provided schedule function.
* Invokes #accept(BulkRequest, ActionListener). Backs off on the provided exception. Retries will be scheduled using
* the class's thread pool.
*
* @param consumer The consumer to which apply the request and listener
* @param bulkRequest The bulk request that should be executed.
* @param settings settings
* @return the bulk response as returned by the client.
* @throws Exception Any exception thrown by the callable.
* @return a future representing the bulk response returned by the client.
*/
public BulkResponse withSyncBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, Settings settings) throws Exception {
PlainActionFuture<BulkResponse> actionFuture = PlainActionFuture.newFuture();
RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, actionFuture, settings, threadPool);
r.execute(bulkRequest);
return actionFuture.actionGet();
public PlainActionFuture<BulkResponse> withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, Settings settings) {
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
withBackoff(consumer, bulkRequest, future, settings);
return future;
}
static class RetryHandler implements ActionListener<BulkResponse> {

View File

@ -134,7 +134,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
this.mainRequest = mainRequest;
this.listener = listener;
BackoffPolicy backoffPolicy = buildBackoffPolicy();
bulkRetry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.wrap(backoffPolicy, task::countBulkRetry)).using(threadPool);
bulkRetry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.wrap(backoffPolicy, task::countBulkRetry), threadPool);
scrollSource = buildScrollableResultSource(backoffPolicy);
scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
/*
@ -337,7 +337,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
finishHim(null);
return;
}
bulkRetry.withAsyncBackoff(client::bulk, request, new ActionListener<BulkResponse>() {
bulkRetry.withBackoff(client::bulk, request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
onBulkResponse(thisBatchStartTime, response);

View File

@ -80,43 +80,37 @@ public class RetryTests extends ESTestCase {
return request;
}
public void testSyncRetryBacksOff() throws Exception {
public void testRetryBacksOff() throws Exception {
BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL);
BulkRequest bulkRequest = createBulkRequest();
BulkResponse response = Retry
.on(EsRejectedExecutionException.class)
.policy(backoff)
.using(bulkClient.threadPool())
.withSyncBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings());
BulkResponse response = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool())
.withBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings())
.actionGet();
assertFalse(response.hasFailures());
assertThat(response.getItems().length, equalTo(bulkRequest.numberOfActions()));
}
public void testSyncRetryFailsAfterBackoff() throws Exception {
public void testRetryFailsAfterBackoff() throws Exception {
BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL - 1);
BulkRequest bulkRequest = createBulkRequest();
BulkResponse response = Retry
.on(EsRejectedExecutionException.class)
.policy(backoff)
.using(bulkClient.threadPool())
.withSyncBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings());
BulkResponse response = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool())
.withBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings())
.actionGet();
assertTrue(response.hasFailures());
assertThat(response.getItems().length, equalTo(bulkRequest.numberOfActions()));
}
public void testAsyncRetryBacksOff() throws Exception {
public void testRetryWithListenerBacksOff() throws Exception {
BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL);
AssertingListener listener = new AssertingListener();
BulkRequest bulkRequest = createBulkRequest();
Retry.on(EsRejectedExecutionException.class)
.policy(backoff)
.using(bulkClient.threadPool())
.withAsyncBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings());
Retry retry = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool());
retry.withBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings());
listener.awaitCallbacksCalled();
listener.assertOnResponseCalled();
@ -125,15 +119,13 @@ public class RetryTests extends ESTestCase {
listener.assertOnFailureNeverCalled();
}
public void testAsyncRetryFailsAfterBacksOff() throws Exception {
public void testRetryWithListenerFailsAfterBacksOff() throws Exception {
BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL - 1);
AssertingListener listener = new AssertingListener();
BulkRequest bulkRequest = createBulkRequest();
Retry.on(EsRejectedExecutionException.class)
.policy(backoff)
.using(bulkClient.threadPool())
.withAsyncBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings());
Retry retry = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool());
retry.withBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings());
listener.awaitCallbacksCalled();

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
import org.elasticsearch.action.bulk.byscroll.BulkIndexByScrollResponseMatcher;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
@ -76,8 +77,9 @@ public class RetryTests extends ESSingleNodeTestCase {
for (int i = 0; i < DOC_COUNT; i++) {
bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i));
}
Retry retry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.exponentialBackoff()).using(client().threadPool());
BulkResponse response = retry.withSyncBackoff(client()::bulk, bulk.request(), client().settings());
Retry retry = new Retry(EsRejectedExecutionException.class, BackoffPolicy.exponentialBackoff(), client().threadPool());
BulkResponse response = retry.withBackoff(client()::bulk, bulk.request(), client().settings()).actionGet();
assertFalse(response.buildFailureMessage(), response.hasFailures());
client().admin().indices().prepareRefresh("source").get();
}