From 8caf7d4ff8738131ae65cca86869c904d0e5f19b Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 25 Oct 2017 10:30:23 +0200 Subject: [PATCH] Decouple BulkProcessor from ThreadPool (#26727) Introduce minimal thread scheduler as a base class for `ThreadPool`. Such a class can be used from the `BulkProcessor` to schedule retries and the flush task. This allows to remove the `ThreadPool` dependency from `BulkProcessor`, which requires to provide settings that contain `node.name` and also needed log4j for logging. Instead, it needs now a `Scheduler` that is much lighter and gets automatically created and shut down on close. Closes #26028 --- .../java/org/elasticsearch/client/CrudIT.java | 22 +- .../documentation/CRUDDocumentationIT.java | 43 ++-- .../action/bulk/BulkProcessor.java | 53 +++-- .../action/bulk/BulkRequestHandler.java | 7 +- .../org/elasticsearch/action/bulk/Retry.java | 26 ++- .../indices/IndexingMemoryController.java | 2 +- .../monitor/jvm/JvmGcMonitorService.java | 2 +- .../elasticsearch/search/SearchService.java | 2 +- .../elasticsearch/threadpool/Scheduler.java | 209 ++++++++++++++++++ .../elasticsearch/threadpool/ThreadPool.java | 171 +++----------- .../watcher/ResourceWatcherService.java | 2 +- .../action/bulk/BulkProcessorTests.java | 4 +- .../index/shard/RefreshListenersTests.java | 2 +- .../IndexingMemoryControllerTests.java | 2 +- .../jvm/JvmGcMonitorServiceSettingsTests.java | 2 +- .../ScheduleWithFixedDelayTests.java | 10 +- docs/java-rest/high-level/apis/bulk.asciidoc | 22 +- .../org/elasticsearch/test/ESTestCase.java | 4 +- 18 files changed, 344 insertions(+), 241 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/threadpool/Scheduler.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index b078a983357..e36c445082e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -39,7 +39,6 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -50,7 +49,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; @@ -614,14 +612,14 @@ public class CrudIT extends ESRestHighLevelClientTestCase { } }; - ThreadPool threadPool = new ThreadPool(Settings.builder().put("node.name", getClass().getName()).build()); // Pull the client to a variable to work around https://bugs.eclipse.org/bugs/show_bug.cgi?id=514884 RestHighLevelClient hlClient = highLevelClient(); - try(BulkProcessor processor = new BulkProcessor.Builder(hlClient::bulkAsync, listener, threadPool) - .setConcurrentRequests(0) - .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB)) - .setBulkActions(nbItems + 1) - .build()) { + + try (BulkProcessor processor = BulkProcessor.builder(hlClient::bulkAsync, listener) + .setConcurrentRequests(0) + .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB)) + .setBulkActions(nbItems + 1) + .build()) { for (int i = 0; i < nbItems; i++) { String id = String.valueOf(i); boolean erroneous = randomBoolean(); @@ -631,7 +629,7 @@ public class CrudIT extends ESRestHighLevelClientTestCase { if (opType == DocWriteRequest.OpType.DELETE) { if (erroneous == false) { assertEquals(RestStatus.CREATED, - highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); + highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); } DeleteRequest deleteRequest = new DeleteRequest("index", "test", id); processor.add(deleteRequest); @@ -653,10 +651,10 @@ public class CrudIT extends ESRestHighLevelClientTestCase { } else if (opType == DocWriteRequest.OpType.UPDATE) { UpdateRequest updateRequest = new UpdateRequest("index", "test", id) - .doc(new IndexRequest().source(xContentType, "id", i)); + .doc(new IndexRequest().source(xContentType, "id", i)); if (erroneous == false) { assertEquals(RestStatus.CREATED, - highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); + highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); } processor.add(updateRequest); } @@ -676,8 +674,6 @@ public class CrudIT extends ESRestHighLevelClientTestCase { assertNull(error.get()); validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); - - terminate(threadPool); } private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse bulkResponse, BulkRequest bulkRequest) { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index 13b8bf0ea6b..b32351656ca 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -19,13 +19,11 @@ package org.elasticsearch.client.documentation; -import org.elasticsearch.Build; import org.apache.http.HttpEntity; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -40,7 +38,6 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -49,9 +46,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -64,7 +59,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.Scheduler; import java.io.IOException; import java.util.Collections; @@ -868,31 +863,27 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { } public void testBulkProcessor() throws InterruptedException, IOException { - Settings settings = Settings.builder().put("node.name", "my-application").build(); RestHighLevelClient client = highLevelClient(); { // tag::bulk-processor-init - ThreadPool threadPool = new ThreadPool(settings); // <1> - - BulkProcessor.Listener listener = new BulkProcessor.Listener() { // <2> + BulkProcessor.Listener listener = new BulkProcessor.Listener() { // <1> @Override public void beforeBulk(long executionId, BulkRequest request) { - // <3> + // <2> } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - // <4> + // <3> } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - // <5> + // <4> } }; - BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool) - .build(); // <6> + BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build(); // <5> // end::bulk-processor-init assertNotNull(bulkProcessor); @@ -917,7 +908,6 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { // tag::bulk-processor-close bulkProcessor.close(); // end::bulk-processor-close - terminate(threadPool); } { // tag::bulk-processor-listener @@ -944,19 +934,14 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { }; // end::bulk-processor-listener - ThreadPool threadPool = new ThreadPool(settings); - try { - // tag::bulk-processor-options - BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool); - builder.setBulkActions(500); // <1> - builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2> - builder.setConcurrentRequests(0); // <3> - builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // <4> - builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // <5> - // end::bulk-processor-options - } finally { - terminate(threadPool); - } + // tag::bulk-processor-options + BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener); + builder.setBulkActions(500); // <1> + builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2> + builder.setConcurrentRequests(0); // <3> + builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // <4> + builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // <5> + // end::bulk-processor-options } } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index a1aae5f8602..372837521dd 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -26,14 +26,17 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.util.Objects; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; @@ -78,22 +81,20 @@ public class BulkProcessor implements Closeable { private final BiConsumer> consumer; private final Listener listener; - private final ThreadPool threadPool; - + private final Scheduler scheduler; + private final Runnable onClose; private int concurrentRequests = 1; private int bulkActions = 1000; private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); private TimeValue flushInterval = null; private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff(); - /** - * Creates a builder of bulk processor with the client to use and the listener that will be used - * to be notified on the completion of bulk requests. - */ - public Builder(BiConsumer> consumer, Listener listener, ThreadPool threadPool) { + private Builder(BiConsumer> consumer, Listener listener, + Scheduler scheduler, Runnable onClose) { this.consumer = consumer; this.listener = listener; - this.threadPool = threadPool; + this.scheduler = scheduler; + this.onClose = onClose; } /** @@ -155,39 +156,51 @@ public class BulkProcessor implements Closeable { * Builds a new bulk processor. */ public BulkProcessor build() { - return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, threadPool); + return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, + scheduler, onClose); } } public static Builder builder(Client client, Listener listener) { Objects.requireNonNull(client, "client"); Objects.requireNonNull(listener, "listener"); + return new Builder(client::bulk, listener, client.threadPool(), () -> {}); + } - return new Builder(client::bulk, listener, client.threadPool()); + public static Builder builder(BiConsumer> consumer, Listener listener) { + Objects.requireNonNull(consumer, "consumer"); + Objects.requireNonNull(listener, "listener"); + final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + return new Builder(consumer, listener, + (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS), + () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); } private final int bulkActions; private final long bulkSize; - private final ThreadPool.Cancellable cancellableFlushTask; + private final Scheduler.Cancellable cancellableFlushTask; private final AtomicLong executionIdGen = new AtomicLong(); private BulkRequest bulkRequest; private final BulkRequestHandler bulkRequestHandler; + private final Scheduler scheduler; + private final Runnable onClose; private volatile boolean closed = false; BulkProcessor(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, - ThreadPool threadPool) { + Scheduler scheduler, Runnable onClose) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); this.bulkRequest = new BulkRequest(); - this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests); - + this.scheduler = scheduler; + this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); // Start period flushing task after everything is setup - this.cancellableFlushTask = startFlushTask(flushInterval, threadPool); + this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); + this.onClose = onClose; } /** @@ -200,6 +213,7 @@ public class BulkProcessor implements Closeable { } catch (InterruptedException exc) { Thread.currentThread().interrupt(); } + onClose.run(); } /** @@ -289,9 +303,9 @@ public class BulkProcessor implements Closeable { return this; } - private ThreadPool.Cancellable startFlushTask(TimeValue flushInterval, ThreadPool threadPool) { + private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) { if (flushInterval == null) { - return new ThreadPool.Cancellable() { + return new Scheduler.Cancellable() { @Override public void cancel() {} @@ -301,9 +315,8 @@ public class BulkProcessor implements Closeable { } }; } - - final Runnable flushRunnable = threadPool.getThreadContext().preserveContext(new Flush()); - return threadPool.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC); + final Runnable flushRunnable = scheduler.preserveContext(new Flush()); + return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC); } private void executeIfNeeded() { diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java index 52a83b00483..423648bbb71 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java @@ -25,7 +25,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.Scheduler; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -44,14 +44,13 @@ public final class BulkRequestHandler { private final int concurrentRequests; BulkRequestHandler(BiConsumer> consumer, BackoffPolicy backoffPolicy, - BulkProcessor.Listener listener, ThreadPool threadPool, - int concurrentRequests) { + BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) { assert concurrentRequests >= 0; this.logger = Loggers.getLogger(getClass()); this.consumer = consumer; this.listener = listener; this.concurrentRequests = concurrentRequests; - this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, threadPool); + this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, scheduler); this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1); } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/Retry.java b/core/src/main/java/org/elasticsearch/action/bulk/Retry.java index 8a9ef245f36..9985d23b9ba 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/Retry.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/Retry.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -41,13 +42,12 @@ import java.util.function.Predicate; public class Retry { private final Class retryOnThrowable; private final BackoffPolicy backoffPolicy; - private final ThreadPool threadPool; + private final Scheduler scheduler; - - public Retry(Class retryOnThrowable, BackoffPolicy backoffPolicy, ThreadPool threadPool) { + public Retry(Class retryOnThrowable, BackoffPolicy backoffPolicy, Scheduler scheduler) { this.retryOnThrowable = retryOnThrowable; this.backoffPolicy = backoffPolicy; - this.threadPool = threadPool; + this.scheduler = scheduler; } /** @@ -58,8 +58,9 @@ public class Retry { * @param listener A listener that is invoked when the bulk request finishes or completes with an exception. The listener is not * @param settings settings */ - public void withBackoff(BiConsumer> consumer, BulkRequest bulkRequest, ActionListener listener, Settings settings) { - RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, threadPool); + public void withBackoff(BiConsumer> consumer, BulkRequest bulkRequest, + ActionListener listener, Settings settings) { + RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, scheduler); r.execute(bulkRequest); } @@ -72,7 +73,8 @@ public class Retry { * @param settings settings * @return a future representing the bulk response returned by the client. */ - public PlainActionFuture withBackoff(BiConsumer> consumer, BulkRequest bulkRequest, Settings settings) { + public PlainActionFuture withBackoff(BiConsumer> consumer, + BulkRequest bulkRequest, Settings settings) { PlainActionFuture future = PlainActionFuture.newFuture(); withBackoff(consumer, bulkRequest, future, settings); return future; @@ -80,7 +82,7 @@ public class Retry { static class RetryHandler implements ActionListener { private final Logger logger; - private final ThreadPool threadPool; + private final Scheduler scheduler; private final BiConsumer> consumer; private final ActionListener listener; private final Iterator backoff; @@ -95,13 +97,13 @@ public class Retry { RetryHandler(Class retryOnThrowable, BackoffPolicy backoffPolicy, BiConsumer> consumer, ActionListener listener, - Settings settings, ThreadPool threadPool) { + Settings settings, Scheduler scheduler) { this.retryOnThrowable = retryOnThrowable; this.backoff = backoffPolicy.iterator(); this.consumer = consumer; this.listener = listener; this.logger = Loggers.getLogger(getClass(), settings); - this.threadPool = threadPool; + this.scheduler = scheduler; // in contrast to System.currentTimeMillis(), nanoTime() uses a monotonic clock under the hood this.startTimestampNanos = System.nanoTime(); } @@ -136,8 +138,8 @@ public class Retry { assert backoff.hasNext(); TimeValue next = backoff.next(); logger.trace("Retry of bulk request scheduled in {} ms.", next.millis()); - Runnable command = threadPool.getThreadContext().preserveContext(() -> this.execute(bulkRequestForRetry)); - scheduledRequestFuture = threadPool.schedule(next, ThreadPool.Names.SAME, command); + Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry)); + scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command); } private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) { diff --git a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index 1b960bb1599..e1c88fff3be 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -36,7 +36,7 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexingOperationListener; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Cancellable; +import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; import java.io.Closeable; diff --git a/core/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java b/core/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java index f260d7430e2..2c7235ca899 100644 --- a/core/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java +++ b/core/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java @@ -28,7 +28,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.monitor.jvm.JvmStats.GarbageCollector; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Cancellable; +import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; import java.util.HashMap; diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index f6dc8a0a39b..4ff3a65553a 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -93,7 +93,7 @@ import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Cancellable; +import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportRequest; diff --git a/core/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/core/src/main/java/org/elasticsearch/threadpool/Scheduler.java new file mode 100644 index 00000000000..2901fc1f7a8 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -0,0 +1,209 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsAbortPolicy; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * Scheduler that allows to schedule one-shot and periodic commands. + */ +public interface Scheduler { + + static ScheduledThreadPoolExecutor initScheduler(Settings settings) { + ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, + EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); + scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + scheduler.setRemoveOnCancelPolicy(true); + return scheduler; + } + + static boolean terminate(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, long timeout, TimeUnit timeUnit) { + scheduledThreadPoolExecutor.shutdown(); + if (awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit)) { + return true; + } + // last resort + scheduledThreadPoolExecutor.shutdownNow(); + return awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit); + } + + static boolean awaitTermination(final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, + final long timeout, final TimeUnit timeUnit) { + try { + if (scheduledThreadPoolExecutor.awaitTermination(timeout, timeUnit)) { + return true; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return false; + } + + /** + * Does nothing by default but can be used by subclasses to save the current thread context and wraps the command in a Runnable + * that restores that context before running the command. + */ + default Runnable preserveContext(Runnable command) { + return command; + } + + /** + * Schedules a one-shot command to be run after a given delay. The command is not run in the context of the calling thread. + * To preserve the context of the calling thread you may call {@link #preserveContext(Runnable)} on the runnable before passing + * it to this method. + * The command runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow + * to execute on a different executor, in which case blocking calls are allowed. + * + * @param delay delay before the task executes + * @param executor the name of the executor that has to execute this task. Ignored in the default implementation but can be used + * by subclasses that support multiple executors. + * @param command the command to run + * @return a ScheduledFuture who's get will return when the task has been added to its target thread pool and throws an exception if + * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool + * the ScheduledFuture cannot interact with it. + * @throws EsRejectedExecutionException if the task cannot be scheduled for execution + */ + ScheduledFuture schedule(TimeValue delay, String executor, Runnable command); + + /** + * Schedules a periodic action that runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow + * to execute on a different executor, in which case blocking calls are allowed. + * + * @param command the action to take + * @param interval the delay interval + * @param executor the name of the executor that has to execute this task. Ignored in the default implementation but can be used + * by subclasses that support multiple executors. + * @return a {@link Cancellable} that can be used to cancel the subsequent runs of the command. If the command is running, it will + * not be interrupted. + */ + default Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) { + return new ReschedulingRunnable(command, interval, executor, this, (e) -> {}, (e) -> {}); + } + + /** + * This interface represents an object whose execution may be cancelled during runtime. + */ + interface Cancellable { + + /** + * Cancel the execution of this object. This method is idempotent. + */ + void cancel(); + + /** + * Check if the execution has been cancelled + * @return true if cancelled + */ + boolean isCancelled(); + } + + /** + * This class encapsulates the scheduling of a {@link Runnable} that needs to be repeated on a interval. For example, checking a value + * for cleanup every second could be done by passing in a Runnable that can perform the check and the specified interval between + * executions of this runnable. NOTE: the runnable is only rescheduled to run again after completion of the runnable. + * + * For this class, completion means that the call to {@link Runnable#run()} returned or an exception was thrown and caught. In + * case of an exception, this class will log the exception and reschedule the runnable for its next execution. This differs from the + * {@link ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} semantics as an exception there would + * terminate the rescheduling of the runnable. + */ + final class ReschedulingRunnable extends AbstractRunnable implements Cancellable { + + private final Runnable runnable; + private final TimeValue interval; + private final String executor; + private final Scheduler scheduler; + private final Consumer rejectionConsumer; + private final Consumer failureConsumer; + + private volatile boolean run = true; + + /** + * Creates a new rescheduling runnable and schedules the first execution to occur after the interval specified + * + * @param runnable the {@link Runnable} that should be executed periodically + * @param interval the time interval between executions + * @param executor the executor where this runnable should be scheduled to run + * @param scheduler the {@link Scheduler} instance to use for scheduling + */ + ReschedulingRunnable(Runnable runnable, TimeValue interval, String executor, Scheduler scheduler, + Consumer rejectionConsumer, Consumer failureConsumer) { + this.runnable = runnable; + this.interval = interval; + this.executor = executor; + this.scheduler = scheduler; + this.rejectionConsumer = rejectionConsumer; + this.failureConsumer = failureConsumer; + scheduler.schedule(interval, executor, this); + } + + @Override + public void cancel() { + run = false; + } + + @Override + public boolean isCancelled() { + return run == false; + } + + @Override + public void doRun() { + // always check run here since this may have been cancelled since the last execution and we do not want to run + if (run) { + runnable.run(); + } + } + + @Override + public void onFailure(Exception e) { + failureConsumer.accept(e); + } + + @Override + public void onRejection(Exception e) { + run = false; + rejectionConsumer.accept(e); + } + + @Override + public void onAfter() { + // if this has not been cancelled reschedule it to run again + if (run) { + try { + scheduler.schedule(interval, executor, this); + } catch (final EsRejectedExecutionException e) { + onRejection(e); + } + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index a05ffd117ab..e179650aeef 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -33,10 +33,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.EsAbortPolicy; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; @@ -64,7 +61,7 @@ import java.util.concurrent.TimeUnit; import static java.util.Collections.unmodifiableMap; -public class ThreadPool extends AbstractComponent implements Closeable { +public class ThreadPool extends AbstractComponent implements Scheduler, Closeable { public static class Names { public static final String SAME = "same"; @@ -143,8 +140,6 @@ public class ThreadPool extends AbstractComponent implements Closeable { private Map executors = new HashMap<>(); - private final ScheduledThreadPoolExecutor scheduler; - private final CachedTimeThread cachedTimeThread; static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService(); @@ -153,6 +148,8 @@ public class ThreadPool extends AbstractComponent implements Closeable { private final Map builders; + private final ScheduledThreadPoolExecutor scheduler; + public Collection builders() { return Collections.unmodifiableCollection(builders.values()); } @@ -210,12 +207,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); this.executors = unmodifiableMap(executors); - - this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); - this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - this.scheduler.setRemoveOnCancelPolicy(true); - + this.scheduler = Scheduler.initScheduler(settings); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); this.cachedTimeThread.start(); @@ -329,25 +321,6 @@ public class ThreadPool extends AbstractComponent implements Closeable { return holder.executor(); } - public ScheduledExecutorService scheduler() { - return this.scheduler; - } - - /** - * Schedules a periodic action that runs on the specified thread pool. - * - * @param command the action to take - * @param interval the delay interval - * @param executor The name of the thread pool on which to execute this task. {@link Names#SAME} means "execute on the scheduler thread", - * which there is only one of. Executing blocking or long running code on the {@link Names#SAME} thread pool should never - * be done as it can cause issues with the cluster - * @return a {@link Cancellable} that can be used to cancel the subsequent runs of the command. If the command is running, it will - * not be interrupted. - */ - public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) { - return new ReschedulingRunnable(command, interval, executor, this); - } - /** * Schedules a one-shot command to run after a given delay. The command is not run in the context of the calling thread. To preserve the * context of the calling thread you may call threadPool.getThreadContext().preserveContext on the runnable before passing @@ -361,13 +334,30 @@ public class ThreadPool extends AbstractComponent implements Closeable { * @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool * the ScheduledFuture will cannot interact with it. - * @throws EsRejectedExecutionException if the task cannot be scheduled for execution + * @throws org.elasticsearch.common.util.concurrent.EsRejectedExecutionException if the task cannot be scheduled for execution */ public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { if (!Names.SAME.equals(executor)) { command = new ThreadedRunnable(command, executor(executor)); } - return scheduler.schedule(new LoggingRunnable(command), delay.millis(), TimeUnit.MILLISECONDS); + return scheduler.schedule(new ThreadPool.LoggingRunnable(command), delay.millis(), TimeUnit.MILLISECONDS); + } + + @Override + public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) { + return new ReschedulingRunnable(command, interval, executor, this, + (e) -> { + if (logger.isDebugEnabled()) { + logger.debug((Supplier) () -> new ParameterizedMessage("scheduled task [{}] was rejected on thread pool [{}]", + command, executor), e); + } + }, + (e) -> logger.warn((Supplier) () -> new ParameterizedMessage("failed to run scheduled task [{}] on thread pool [{}]", + command, executor), e)); + } + + public Runnable preserveContext(Runnable command) { + return getThreadContext().preserveContext(command); } public void shutdown() { @@ -376,7 +366,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { scheduler.shutdown(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { - ((ThreadPoolExecutor) executor.executor()).shutdown(); + executor.executor().shutdown(); } } } @@ -387,7 +377,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { scheduler.shutdownNow(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { - ((ThreadPoolExecutor) executor.executor()).shutdownNow(); + executor.executor().shutdownNow(); } } } @@ -396,14 +386,17 @@ public class ThreadPool extends AbstractComponent implements Closeable { boolean result = scheduler.awaitTermination(timeout, unit); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { - result &= ((ThreadPoolExecutor) executor.executor()).awaitTermination(timeout, unit); + result &= executor.executor().awaitTermination(timeout, unit); } } - cachedTimeThread.join(unit.toMillis(timeout)); return result; } + public ScheduledExecutorService scheduler() { + return this.scheduler; + } + /** * Constrains a value between minimum and maximum values * (inclusive). @@ -726,7 +719,9 @@ public class ThreadPool extends AbstractComponent implements Closeable { if (pool != null) { try { pool.shutdown(); - if (awaitTermination(pool, timeout, timeUnit)) return true; + if (awaitTermination(pool, timeout, timeUnit)) { + return true; + } // last resort pool.shutdownNow(); return awaitTermination(pool, timeout, timeUnit); @@ -738,11 +733,11 @@ public class ThreadPool extends AbstractComponent implements Closeable { } private static boolean awaitTermination( - final ThreadPool pool, + final ThreadPool threadPool, final long timeout, final TimeUnit timeUnit) { try { - if (pool.awaitTermination(timeout, timeUnit)) { + if (threadPool.awaitTermination(timeout, timeUnit)) { return true; } } catch (InterruptedException e) { @@ -760,102 +755,6 @@ public class ThreadPool extends AbstractComponent implements Closeable { return threadContext; } - /** - * This interface represents an object whose execution may be cancelled during runtime. - */ - public interface Cancellable { - - /** - * Cancel the execution of this object. This method is idempotent. - */ - void cancel(); - - /** - * Check if the execution has been cancelled - * @return true if cancelled - */ - boolean isCancelled(); - } - - /** - * This class encapsulates the scheduling of a {@link Runnable} that needs to be repeated on a interval. For example, checking a value - * for cleanup every second could be done by passing in a Runnable that can perform the check and the specified interval between - * executions of this runnable. NOTE: the runnable is only rescheduled to run again after completion of the runnable. - * - * For this class, completion means that the call to {@link Runnable#run()} returned or an exception was thrown and caught. In - * case of an exception, this class will log the exception and reschedule the runnable for its next execution. This differs from the - * {@link ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} semantics as an exception there would - * terminate the rescheduling of the runnable. - */ - static final class ReschedulingRunnable extends AbstractRunnable implements Cancellable { - - private final Runnable runnable; - private final TimeValue interval; - private final String executor; - private final ThreadPool threadPool; - - private volatile boolean run = true; - - /** - * Creates a new rescheduling runnable and schedules the first execution to occur after the interval specified - * - * @param runnable the {@link Runnable} that should be executed periodically - * @param interval the time interval between executions - * @param executor the executor where this runnable should be scheduled to run - * @param threadPool the {@link ThreadPool} instance to use for scheduling - */ - ReschedulingRunnable(Runnable runnable, TimeValue interval, String executor, ThreadPool threadPool) { - this.runnable = runnable; - this.interval = interval; - this.executor = executor; - this.threadPool = threadPool; - threadPool.schedule(interval, executor, this); - } - - @Override - public void cancel() { - run = false; - } - - @Override - public boolean isCancelled() { - return run == false; - } - - @Override - public void doRun() { - // always check run here since this may have been cancelled since the last execution and we do not want to run - if (run) { - runnable.run(); - } - } - - @Override - public void onFailure(Exception e) { - threadPool.logger.warn((Supplier) () -> new ParameterizedMessage("failed to run scheduled task [{}] on thread pool [{}]", runnable.toString(), executor), e); - } - - @Override - public void onRejection(Exception e) { - run = false; - if (threadPool.logger.isDebugEnabled()) { - threadPool.logger.debug((Supplier) () -> new ParameterizedMessage("scheduled task [{}] was rejected on thread pool [{}]", runnable, executor), e); - } - } - - @Override - public void onAfter() { - // if this has not been cancelled reschedule it to run again - if (run) { - try { - threadPool.schedule(interval, executor, this); - } catch (final EsRejectedExecutionException e) { - onRejection(e); - } - } - } - } - public static boolean assertNotScheduleThread(String reason) { assert Thread.currentThread().getName().contains("scheduler") == false : "Expected current thread [" + Thread.currentThread() + "] to not be the scheduler thread. Reason: [" + reason + "]"; diff --git a/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java b/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java index f897fdfa749..54b24a86cc3 100644 --- a/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java +++ b/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java @@ -25,7 +25,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Cancellable; +import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; import java.io.IOException; diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index 9b37a2b9f4c..4ff5b69ad37 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -67,7 +67,7 @@ public class BulkProcessorTests extends ESTestCase { final BulkProcessor bulkProcessor; assertNull(threadPool.getThreadContext().getHeader(headerKey)); assertNull(threadPool.getThreadContext().getTransient(transientKey)); - try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) { + try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { threadPool.getThreadContext().putHeader(headerKey, headerValue); threadPool.getThreadContext().putTransient(transientKey, transientValue); bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() { @@ -82,7 +82,7 @@ public class BulkProcessorTests extends ESTestCase { @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { } - }, 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, threadPool); + }, 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, threadPool, () -> {}); } assertNull(threadPool.getThreadContext().getHeader(headerKey)); assertNull(threadPool.getThreadContext().getTransient(transientKey)); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index da90f8023d2..fcc3c93fc37 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -56,7 +56,7 @@ import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Cancellable; +import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; import org.junit.After; import org.junit.Before; diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index d8367b0d6a6..2a490c1dcf9 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -35,7 +35,7 @@ import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Cancellable; +import org.elasticsearch.threadpool.Scheduler.Cancellable; import java.io.IOException; import java.util.ArrayList; diff --git a/core/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java b/core/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java index 48817e52d56..f3e86c532d5 100644 --- a/core/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java @@ -22,9 +22,9 @@ package org.elasticsearch.monitor.jvm; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.threadpool.ThreadPool.Cancellable; import java.util.AbstractMap; import java.util.HashSet; diff --git a/core/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java b/core/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java index dd1f4991f95..da0125d6f65 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java @@ -26,9 +26,9 @@ import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool.Cancellable; +import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; -import org.elasticsearch.threadpool.ThreadPool.ReschedulingRunnable; +import org.elasticsearch.threadpool.Scheduler.ReschedulingRunnable; import org.junit.After; import org.junit.Before; @@ -80,7 +80,8 @@ public class ScheduleWithFixedDelayTests extends ESTestCase { Thread.currentThread().interrupt(); } }; - ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool); + ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool, + (e) -> {}, (e) -> {}); // this call was made during construction of the runnable verify(threadPool, times(1)).schedule(delay, Names.GENERIC, reschedulingRunnable); @@ -260,7 +261,8 @@ public class ScheduleWithFixedDelayTests extends ESTestCase { } }; Runnable runnable = () -> {}; - ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool); + ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, + threadPool, (e) -> {}, (e) -> {}); assertTrue(reschedulingRunnable.isCancelled()); } diff --git a/docs/java-rest/high-level/apis/bulk.asciidoc b/docs/java-rest/high-level/apis/bulk.asciidoc index 9bbc0b31062..3102e96519e 100644 --- a/docs/java-rest/high-level/apis/bulk.asciidoc +++ b/docs/java-rest/high-level/apis/bulk.asciidoc @@ -125,27 +125,24 @@ The `BulkProcessor` simplifies the usage of the Bulk API by providing a utility class that allows index/update/delete operations to be transparently executed as they are added to the processor. -In order to execute the requests, the `BulkProcessor` requires 3 components: +In order to execute the requests, the `BulkProcessor` requires the following +components: `RestHighLevelClient`:: This client is used to execute the `BulkRequest` and to retrieve the `BulkResponse` `BulkProcessor.Listener`:: This listener is called before and after every `BulkRequest` execution or when a `BulkRequest` failed -`ThreadPool`:: The `BulkRequest` executions are done using threads from this -pool, allowing the `BulkProcessor` to work in a non-blocking manner and to -accept new index/update/delete requests while bulk requests are executing. -Then the `BulkProcessor.Builder` class can be used to build a new `BulkProcessor`: +Then the `BulkProcessor.builder` method can be used to build a new `BulkProcessor`: ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-init] -------------------------------------------------- -<1> Create the `ThreadPool` using the given `Settings` -<2> Create the `BulkProcessor.Listener` -<3> This method is called before each execution of a `BulkRequest` -<4> This method is called after each execution of a `BulkRequest` -<5> This method is called when a `BulkRequest` failed -<6> Create the `BulkProcessor` by calling the `build()` method from +<1> Create the `BulkProcessor.Listener` +<2> This method is called before each execution of a `BulkRequest` +<3> This method is called after each execution of a `BulkRequest` +<4> This method is called when a `BulkRequest` failed +<5> Create the `BulkProcessor` by calling the `build()` method from the `BulkProcessor.Builder`. The `RestHighLevelClient.bulkAsync()` method will be used to execute the `BulkRequest` under the hood. @@ -190,7 +187,7 @@ to know if the `BulkResponse` contains errors the failure Once all requests have been added to the `BulkProcessor`, its instance needs to -be closed closed using one of the two available closing methods. +be closed using one of the two available closing methods. The `awaitClose()` method can be used to wait until all requests have been processed or the specified waiting time elapses: @@ -209,3 +206,4 @@ include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-close] Both methods flush the requests added to the processor before closing the processor and also forbid any new request to be added to it. + diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 243ab11e61f..121471bdb07 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -765,8 +765,8 @@ public abstract class ESTestCase extends LuceneTestCase { return terminated; } - public static boolean terminate(ThreadPool service) throws InterruptedException { - return ThreadPool.terminate(service, 10, TimeUnit.SECONDS); + public static boolean terminate(ThreadPool threadPool) throws InterruptedException { + return ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } /**