From 5b1fbe5e6c31b0a529cfa632956609469c5d74a0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 5 Apr 2017 12:12:43 -0500 Subject: [PATCH] Decouple BulkProcessor from client implementation (#23373) This commit modifies the BulkProcessor to be decoupled from the client implementation. Instead it just takes a BiConsumer> that executes the BulkRequest. --- .../java/org/elasticsearch/client/CrudIT.java | 103 ++++++++++++++++ .../action/bulk/BulkProcessor.java | 85 +++++++------- .../action/bulk/BulkRequestHandler.java | 79 +++++++------ .../org/elasticsearch/action/bulk/Retry.java | 110 +++++++++--------- .../AbstractAsyncBulkByScrollAction.java | 18 ++- .../action/bulk/BulkProcessorIT.java | 6 +- .../action/bulk/BulkProcessorRetryIT.java | 1 - .../elasticsearch/action/bulk/RetryTests.java | 12 +- .../AsyncBulkByScrollActionTests.java | 2 +- .../index/reindex/TransportReindexAction.java | 10 +- .../reindex/TransportUpdateByQueryAction.java | 10 +- .../index/reindex/ReindexMetadataTests.java | 3 +- .../index/reindex/ReindexScriptTests.java | 3 +- .../index/reindex/RetryTests.java | 4 +- .../reindex/UpdateByQueryMetadataTests.java | 4 +- .../reindex/UpdateByQueryWithScriptTests.java | 3 +- 16 files changed, 297 insertions(+), 156 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 2a7dfebd284..5f738ce1eee 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -38,6 +39,9 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -46,10 +50,13 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonMap; @@ -577,6 +584,102 @@ public class CrudIT extends ESRestHighLevelClientTestCase { assertTrue(bulkResponse.getTookInMillis() > 0); assertEquals(nbItems, bulkResponse.getItems().length); + validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); + } + + public void testBulkProcessorIntegration() throws IOException, InterruptedException { + int nbItems = randomIntBetween(10, 100); + boolean[] errors = new boolean[nbItems]; + + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference requestRef = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + + BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + responseRef.set(response); + requestRef.set(request); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + error.set(failure); + } + }; + + ThreadPool threadPool = new ThreadPool(Settings.builder().put("node.name", getClass().getName()).build()); + try(BulkProcessor processor = new BulkProcessor.Builder(highLevelClient()::bulkAsync, listener, threadPool) + .setConcurrentRequests(0) + .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB)) + .setBulkActions(nbItems + 1) + .build()) { + for (int i = 0; i < nbItems; i++) { + String id = String.valueOf(i); + boolean erroneous = randomBoolean(); + errors[i] = erroneous; + + DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values()); + if (opType == DocWriteRequest.OpType.DELETE) { + if (erroneous == false) { + assertEquals(RestStatus.CREATED, + highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); + } + DeleteRequest deleteRequest = new DeleteRequest("index", "test", id); + processor.add(deleteRequest); + + } else { + if (opType == DocWriteRequest.OpType.INDEX) { + IndexRequest indexRequest = new IndexRequest("index", "test", id).source(xContentType, "id", i); + if (erroneous) { + indexRequest.version(12L); + } + processor.add(indexRequest); + + } else if (opType == DocWriteRequest.OpType.CREATE) { + IndexRequest createRequest = new IndexRequest("index", "test", id).source(xContentType, "id", i).create(true); + if (erroneous) { + assertEquals(RestStatus.CREATED, highLevelClient().index(createRequest).status()); + } + processor.add(createRequest); + + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = new UpdateRequest("index", "test", id) + .doc(new IndexRequest().source(xContentType, "id", i)); + if (erroneous == false) { + assertEquals(RestStatus.CREATED, + highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); + } + processor.add(updateRequest); + } + } + } + assertNull(responseRef.get()); + assertNull(requestRef.get()); + } + + + BulkResponse bulkResponse = responseRef.get(); + BulkRequest bulkRequest = requestRef.get(); + + assertEquals(RestStatus.OK, bulkResponse.status()); + assertTrue(bulkResponse.getTookInMillis() > 0); + assertEquals(nbItems, bulkResponse.getItems().length); + assertNull(error.get()); + + validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); + + terminate(threadPool); + } + + private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse bulkResponse, BulkRequest bulkRequest) { for (int i = 0; i < nbItems; i++) { BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i]; diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index fdb2ef3aba2..0e448de686f 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -28,17 +29,14 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.util.Objects; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; /** * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request @@ -66,7 +64,7 @@ public class BulkProcessor implements Closeable { /** * Callback after a failed execution of bulk request. - * + *

* Note that in case an instance of InterruptedException is passed, which means that request processing has been * cancelled externally, the thread's interruption status has been restored prior to calling this method. */ @@ -78,10 +76,10 @@ public class BulkProcessor implements Closeable { */ public static class Builder { - private final Client client; + private final BiConsumer> consumer; private final Listener listener; + private final ThreadPool threadPool; - private String name; private int concurrentRequests = 1; private int bulkActions = 1000; private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); @@ -92,17 +90,10 @@ public class BulkProcessor implements Closeable { * Creates a builder of bulk processor with the client to use and the listener that will be used * to be notified on the completion of bulk requests. */ - public Builder(Client client, Listener listener) { - this.client = client; + public Builder(BiConsumer> consumer, Listener listener, ThreadPool threadPool) { + this.consumer = consumer; this.listener = listener; - } - - /** - * Sets an optional name to identify this bulk processor. - */ - public Builder setName(String name) { - this.name = name; - return this; + this.threadPool = threadPool; } /** @@ -164,7 +155,7 @@ public class BulkProcessor implements Closeable { * Builds a new bulk processor. */ public BulkProcessor build() { - return new BulkProcessor(client, backoffPolicy, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval); + return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, threadPool); } } @@ -172,15 +163,13 @@ public class BulkProcessor implements Closeable { Objects.requireNonNull(client, "client"); Objects.requireNonNull(listener, "listener"); - return new Builder(client, listener); + return new Builder(client::bulk, listener, client.threadPool()); } private final int bulkActions; private final long bulkSize; - - private final ScheduledThreadPoolExecutor scheduler; - private final ScheduledFuture scheduledFuture; + private final ThreadPool.Cancellable cancellableFlushTask; private final AtomicLong executionIdGen = new AtomicLong(); @@ -189,22 +178,21 @@ public class BulkProcessor implements Closeable { private volatile boolean closed = false; - BulkProcessor(Client client, BackoffPolicy backoffPolicy, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) { + BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, + int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, + ThreadPool threadPool) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); - this.bulkRequest = new BulkRequest(); - this.bulkRequestHandler = (concurrentRequests == 0) ? BulkRequestHandler.syncHandler(client, backoffPolicy, listener) : BulkRequestHandler.asyncHandler(client, backoffPolicy, listener, concurrentRequests); - if (flushInterval != null) { - this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor")); - this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS); + if (concurrentRequests == 0) { + this.bulkRequestHandler = BulkRequestHandler.syncHandler(consumer, backoffPolicy, listener, threadPool); } else { - this.scheduler = null; - this.scheduledFuture = null; + this.bulkRequestHandler = BulkRequestHandler.asyncHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests); } + + // Start period flushing task after everything is setup + this.cancellableFlushTask = startFlushTask(flushInterval, threadPool); } /** @@ -214,20 +202,20 @@ public class BulkProcessor implements Closeable { public void close() { try { awaitClose(0, TimeUnit.NANOSECONDS); - } catch(InterruptedException exc) { + } catch (InterruptedException exc) { Thread.currentThread().interrupt(); } } /** * Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed. - * + *

* If concurrent requests are not enabled, returns {@code true} immediately. * If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true}, * If the specified waiting time elapses before all bulk requests complete, {@code false} is returned. * * @param timeout The maximum time to wait for the bulk requests to complete - * @param unit The time unit of the {@code timeout} argument + * @param unit The time unit of the {@code timeout} argument * @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests completed * @throws InterruptedException If the current thread is interrupted */ @@ -236,10 +224,9 @@ public class BulkProcessor implements Closeable { return true; } closed = true; - if (this.scheduledFuture != null) { - FutureUtils.cancel(this.scheduledFuture); - this.scheduler.shutdown(); - } + + this.cancellableFlushTask.cancel(); + if (bulkRequest.numberOfActions() > 0) { execute(); } @@ -301,12 +288,28 @@ public class BulkProcessor implements Closeable { * Adds the data from the bytes to be processed by the bulk processor */ public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, - @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception { + @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception { bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType); executeIfNeeded(); return this; } + private ThreadPool.Cancellable startFlushTask(TimeValue flushInterval, ThreadPool threadPool) { + if (flushInterval == null) { + return new ThreadPool.Cancellable() { + @Override + public void cancel() {} + + @Override + public boolean isCancelled() { + return true; + } + }; + } + + return threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.GENERIC); + } + private void executeIfNeeded() { ensureOpen(); if (!isOverTheLimit()) { diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java index 5d9910d9179..e1755bfb8bf 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java @@ -22,23 +22,27 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; /** * Abstracts the low-level details of bulk request handling */ abstract class BulkRequestHandler { protected final Logger logger; - protected final Client client; + protected final BiConsumer> consumer; + protected final ThreadPool threadPool; - protected BulkRequestHandler(Client client) { - this.client = client; - this.logger = Loggers.getLogger(getClass(), client.settings()); + protected BulkRequestHandler(BiConsumer> consumer, ThreadPool threadPool) { + this.logger = Loggers.getLogger(getClass()); + this.consumer = consumer; + this.threadPool = threadPool; } @@ -47,20 +51,25 @@ abstract class BulkRequestHandler { public abstract boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException; - public static BulkRequestHandler syncHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener) { - return new SyncBulkRequestHandler(client, backoffPolicy, listener); + public static BulkRequestHandler syncHandler(BiConsumer> consumer, + BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, + ThreadPool threadPool) { + return new SyncBulkRequestHandler(consumer, backoffPolicy, listener, threadPool); } - public static BulkRequestHandler asyncHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, int concurrentRequests) { - return new AsyncBulkRequestHandler(client, backoffPolicy, listener, concurrentRequests); + public static BulkRequestHandler asyncHandler(BiConsumer> consumer, + BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, + ThreadPool threadPool, int concurrentRequests) { + return new AsyncBulkRequestHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests); } private static class SyncBulkRequestHandler extends BulkRequestHandler { private final BulkProcessor.Listener listener; private final BackoffPolicy backoffPolicy; - SyncBulkRequestHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener) { - super(client); + SyncBulkRequestHandler(BiConsumer> consumer, BackoffPolicy backoffPolicy, + BulkProcessor.Listener listener, ThreadPool threadPool) { + super(consumer, threadPool); this.backoffPolicy = backoffPolicy; this.listener = listener; } @@ -71,9 +80,10 @@ abstract class BulkRequestHandler { try { listener.beforeBulk(executionId, bulkRequest); BulkResponse bulkResponse = Retry - .on(EsRejectedExecutionException.class) - .policy(backoffPolicy) - .withSyncBackoff(client, bulkRequest); + .on(EsRejectedExecutionException.class) + .policy(backoffPolicy) + .using(threadPool) + .withSyncBackoff(consumer, bulkRequest, Settings.EMPTY); afterCalled = true; listener.afterBulk(executionId, bulkRequest, bulkResponse); } catch (InterruptedException e) { @@ -103,8 +113,10 @@ abstract class BulkRequestHandler { private final Semaphore semaphore; private final int concurrentRequests; - private AsyncBulkRequestHandler(Client client, BackoffPolicy backoffPolicy, BulkProcessor.Listener listener, int concurrentRequests) { - super(client); + private AsyncBulkRequestHandler(BiConsumer> consumer, BackoffPolicy backoffPolicy, + BulkProcessor.Listener listener, ThreadPool threadPool, + int concurrentRequests) { + super(consumer, threadPool); this.backoffPolicy = backoffPolicy; assert concurrentRequests > 0; this.listener = listener; @@ -121,26 +133,27 @@ abstract class BulkRequestHandler { semaphore.acquire(); acquired = true; Retry.on(EsRejectedExecutionException.class) - .policy(backoffPolicy) - .withAsyncBackoff(client, bulkRequest, new ActionListener() { - @Override - public void onResponse(BulkResponse response) { - try { - listener.afterBulk(executionId, bulkRequest, response); - } finally { - semaphore.release(); - } + .policy(backoffPolicy) + .using(threadPool) + .withAsyncBackoff(consumer, bulkRequest, new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + try { + listener.afterBulk(executionId, bulkRequest, response); + } finally { + semaphore.release(); } + } - @Override - public void onFailure(Exception e) { - try { - listener.afterBulk(executionId, bulkRequest, e); - } finally { - semaphore.release(); - } + @Override + public void onFailure(Exception e) { + try { + listener.afterBulk(executionId, bulkRequest, e); + } finally { + semaphore.release(); } - }); + } + }, Settings.EMPTY); bulkRequestSetupSuccessful = true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/Retry.java b/core/src/main/java/org/elasticsearch/action/bulk/Retry.java index c746894b78e..e1ba1a6bee1 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/Retry.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/Retry.java @@ -20,19 +20,25 @@ package org.elasticsearch.action.bulk; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Predicate; /** @@ -42,11 +48,16 @@ public class Retry { private final Class retryOnThrowable; private BackoffPolicy backoffPolicy; + private ThreadPool threadPool; public static Retry on(Class retryOnThrowable) { return new Retry(retryOnThrowable); } + Retry(Class retryOnThrowable) { + this.retryOnThrowable = retryOnThrowable; + } + /** * @param backoffPolicy The backoff policy that defines how long and how often to wait for retries. */ @@ -55,42 +66,48 @@ public class Retry { return this; } - Retry(Class retryOnThrowable) { - this.retryOnThrowable = retryOnThrowable; - } - /** - * Invokes #bulk(BulkRequest, ActionListener) on the provided client. Backs off on the provided exception and delegates results to the - * provided listener. - * - * @param client Client invoking the bulk request. - * @param bulkRequest The bulk request that should be executed. - * @param listener A listener that is invoked when the bulk request finishes or completes with an exception. The listener is not + * @param threadPool The threadPool that will be used to schedule retries. */ - public void withAsyncBackoff(Client client, BulkRequest bulkRequest, ActionListener listener) { - AsyncRetryHandler r = new AsyncRetryHandler(retryOnThrowable, backoffPolicy, client, listener); - r.execute(bulkRequest); - + public Retry using(ThreadPool threadPool) { + this.threadPool = threadPool; + return this; } /** - * Invokes #bulk(BulkRequest) on the provided client. Backs off on the provided exception. - * - * @param client Client invoking the bulk request. + * Invokes #apply(BulkRequest, ActionListener). Backs off on the provided exception and delegates results to the + * provided listener. Retries will be attempted using the provided schedule function + * @param consumer The consumer to which apply the request and listener * @param bulkRequest The bulk request that should be executed. + * @param listener A listener that is invoked when the bulk request finishes or completes with an exception. The listener is not + * @param settings settings + */ + public void withAsyncBackoff(BiConsumer> consumer, BulkRequest bulkRequest, ActionListener listener, Settings settings) { + RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, threadPool); + r.execute(bulkRequest); + } + + /** + * Invokes #apply(BulkRequest, ActionListener). Backs off on the provided exception. Retries will be attempted using + * the provided schedule function. + * + * @param consumer The consumer to which apply the request and listener + * @param bulkRequest The bulk request that should be executed. + * @param settings settings * @return the bulk response as returned by the client. * @throws Exception Any exception thrown by the callable. */ - public BulkResponse withSyncBackoff(Client client, BulkRequest bulkRequest) throws Exception { - return SyncRetryHandler - .create(retryOnThrowable, backoffPolicy, client) - .executeBlocking(bulkRequest) - .actionGet(); + public BulkResponse withSyncBackoff(BiConsumer> consumer, BulkRequest bulkRequest, Settings settings) throws Exception { + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, actionFuture, settings, threadPool); + r.execute(bulkRequest); + return actionFuture.actionGet(); } - static class AbstractRetryHandler implements ActionListener { + static class RetryHandler implements ActionListener { private final Logger logger; - private final Client client; + private final ThreadPool threadPool; + private final BiConsumer> consumer; private final ActionListener listener; private final Iterator backoff; private final Class retryOnThrowable; @@ -102,12 +119,15 @@ public class Retry { private volatile BulkRequest currentBulkRequest; private volatile ScheduledFuture scheduledRequestFuture; - AbstractRetryHandler(Class retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener listener) { + RetryHandler(Class retryOnThrowable, BackoffPolicy backoffPolicy, + BiConsumer> consumer, ActionListener listener, + Settings settings, ThreadPool threadPool) { this.retryOnThrowable = retryOnThrowable; this.backoff = backoffPolicy.iterator(); - this.client = client; + this.consumer = consumer; this.listener = listener; - this.logger = Loggers.getLogger(getClass(), client.settings()); + this.logger = Loggers.getLogger(getClass(), settings); + this.threadPool = threadPool; // in contrast to System.currentTimeMillis(), nanoTime() uses a monotonic clock under the hood this.startTimestampNanos = System.nanoTime(); } @@ -142,9 +162,8 @@ public class Retry { assert backoff.hasNext(); TimeValue next = backoff.next(); logger.trace("Retry of bulk request scheduled in {} ms.", next.millis()); - Runnable retry = () -> this.execute(bulkRequestForRetry); - retry = client.threadPool().getThreadContext().preserveContext(retry); - scheduledRequestFuture = client.threadPool().schedule(next, ThreadPool.Names.SAME, retry); + Runnable command = threadPool.getThreadContext().preserveContext(() -> this.execute(bulkRequestForRetry)); + scheduledRequestFuture = threadPool.schedule(next, ThreadPool.Names.SAME, command); } private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) { @@ -208,32 +227,7 @@ public class Retry { public void execute(BulkRequest bulkRequest) { this.currentBulkRequest = bulkRequest; - client.bulk(bulkRequest, this); - } - } - - static class AsyncRetryHandler extends AbstractRetryHandler { - AsyncRetryHandler(Class retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener listener) { - super(retryOnThrowable, backoffPolicy, client, listener); - } - } - - static class SyncRetryHandler extends AbstractRetryHandler { - private final PlainActionFuture actionFuture; - - public static SyncRetryHandler create(Class retryOnThrowable, BackoffPolicy backoffPolicy, Client client) { - PlainActionFuture actionFuture = PlainActionFuture.newFuture(); - return new SyncRetryHandler(retryOnThrowable, backoffPolicy, client, actionFuture); - } - - SyncRetryHandler(Class retryOnThrowable, BackoffPolicy backoffPolicy, Client client, PlainActionFuture actionFuture) { - super(retryOnThrowable, backoffPolicy, client, actionFuture); - this.actionFuture = actionFuture; - } - - public ActionFuture executeBlocking(BulkRequest bulkRequest) { - super.execute(bulkRequest); - return actionFuture; + consumer.accept(bulkRequest, this); } } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java index 72d39c038ed..c984dbd8677 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -74,7 +75,6 @@ import java.util.function.BiFunction; import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableList; import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff; import static org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; @@ -106,6 +106,7 @@ public abstract class AbstractAsyncBulkByScrollAction listener; private final Retry bulkRetry; private final ScrollableHitSource scrollSource; + private final Settings settings; /** * This BiFunction is used to apply various changes depending of the Reindex action and the search hit, @@ -114,19 +115,26 @@ public abstract class AbstractAsyncBulkByScrollAction, ScrollableHitSource.Hit, RequestWrapper> scriptApplier; + public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState, + ActionListener listener) { + this(task, logger, client, threadPool, mainRequest, scriptService, clusterState, listener, client.settings()); + } + public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState, - ActionListener listener) { + ActionListener listener, Settings settings) { this.task = task; this.logger = logger; this.client = client; + this.settings = settings; this.threadPool = threadPool; this.scriptService = scriptService; this.clusterState = clusterState; this.mainRequest = mainRequest; this.listener = listener; BackoffPolicy backoffPolicy = buildBackoffPolicy(); - bulkRetry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.wrap(backoffPolicy, task::countBulkRetry)); + bulkRetry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.wrap(backoffPolicy, task::countBulkRetry)).using(threadPool); scrollSource = buildScrollableResultSource(backoffPolicy); scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null"); /* @@ -329,7 +337,7 @@ public abstract class AbstractAsyncBulkByScrollAction() { + bulkRetry.withAsyncBackoff(client::bulk, request, new ActionListener() { @Override public void onResponse(BulkResponse response) { onBulkResponse(thisBatchStartTime, response); @@ -339,7 +347,7 @@ public abstract class AbstractAsyncBulkByScrollAction { DummyAsyncBulkByScrollAction() { super(testTask, AsyncBulkByScrollActionTests.this.logger, new ParentTaskAssigningClient(client, localNode, testTask), - client.threadPool(), testRequest, null, null, listener); + client.threadPool(), testRequest, null, null, listener, Settings.EMPTY); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index deee6ae1160..b232c50c2b2 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -251,10 +251,16 @@ public class TransportReindexAction extends HandledTransportAction createdThreads = emptyList(); + AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState, + ActionListener listener) { + this(task, logger, client, threadPool, request, scriptService, clusterState, listener, client.settings()); + } + AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState, - ActionListener listener) { - super(task, logger, client, threadPool, request, scriptService, clusterState, listener); + ActionListener listener, Settings settings) { + super(task, logger, client, threadPool, request, scriptService, clusterState, listener, settings); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index ec9cb94687a..12d8696319f 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -90,10 +90,16 @@ public class TransportUpdateByQueryAction extends HandledTransportAction { + AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState, + ActionListener listener) { + this(task, logger, client, threadPool, request, scriptService, clusterState, listener, client.settings()); + } + AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState, - ActionListener listener) { - super(task, logger, client, threadPool, request, scriptService, clusterState, listener); + ActionListener listener, Settings settings) { + super(task, logger, client, threadPool, request, scriptService, clusterState, listener, settings); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java index 172047bfef2..490480e1ac5 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Hit; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.settings.Settings; /** * Index-by-search test for ttl, timestamp, and routing. @@ -81,7 +82,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadat private class TestAction extends TransportReindexAction.AsyncIndexBySearchAction { TestAction() { super(ReindexMetadataTests.this.task, ReindexMetadataTests.this.logger, null, ReindexMetadataTests.this.threadPool, request(), - null, null, listener()); + null, null, listener(), Settings.EMPTY); } public ReindexRequest mainRequest() { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java index 4a61d733a80..8ef99bd18d6 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.ScriptService; import java.util.Map; @@ -112,6 +113,6 @@ public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTes @Override protected TransportReindexAction.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) { return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService, null, - listener()); + listener(), Settings.EMPTY); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index 6e8da59eee3..c1ef1142e1d 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -76,8 +76,8 @@ public class RetryTests extends ESSingleNodeTestCase { for (int i = 0; i < DOC_COUNT; i++) { bulk.add(client().prepareIndex("source", "test").setSource("foo", "bar " + i)); } - Retry retry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.exponentialBackoff()); - BulkResponse response = retry.withSyncBackoff(client(), bulk.request()); + Retry retry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.exponentialBackoff()).using(client().threadPool()); + BulkResponse response = retry.withSyncBackoff(client()::bulk, bulk.request(), client().settings()); assertFalse(response.buildFailureMessage(), response.hasFailures()); client().admin().indices().prepareRefresh("source").get(); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java index c2b1dc50871..174fd671537 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Hit; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.settings.Settings; public class UpdateByQueryMetadataTests extends AbstractAsyncBulkByScrollActionMetadataTestCase { @@ -47,7 +48,8 @@ public class UpdateByQueryMetadataTests private class TestAction extends TransportUpdateByQueryAction.AsyncIndexBySearchAction { TestAction() { super(UpdateByQueryMetadataTests.this.task, UpdateByQueryMetadataTests.this.logger, null, - UpdateByQueryMetadataTests.this.threadPool, request(), null, null, listener()); + UpdateByQueryMetadataTests.this.threadPool, request(), null, null, listener(), + Settings.EMPTY); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java index 394669f00ec..3d478094e8b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.ScriptService; import java.util.Date; @@ -56,6 +57,6 @@ public class UpdateByQueryWithScriptTests @Override protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action(ScriptService scriptService, UpdateByQueryRequest request) { return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService, null, - listener()); + listener(), Settings.EMPTY); } }