mirror of
synced 2025-03-09 14:34:43 +00:00
Decouple BulkProcessor from ThreadPool (#26727)
Introduce minimal thread scheduler as a base class for `ThreadPool`. Such a class can be used from the `BulkProcessor` to schedule retries and the flush task. This allows to remove the `ThreadPool` dependency from `BulkProcessor`, which requires to provide settings that contain `node.name` and also needed log4j for logging. Instead, it needs now a `Scheduler` that is much lighter and gets automatically created and shut down on close. Closes #26028
This commit is contained in:
@ -39,7 +39,6 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -50,7 +49,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Collections;
@ -614,14 +612,14 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
ThreadPool threadPool = new ThreadPool(Settings.builder().put("node.name", getClass().getName()).build());
// Pull the client to a variable to work around https://bugs.eclipse.org/bugs/show_bug.cgi?id=514884
RestHighLevelClient hlClient = highLevelClient();
try(BulkProcessor processor = new BulkProcessor.Builder(hlClient::bulkAsync, listener, threadPool)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB))
.setBulkActions(nbItems + 1)
.build()) {
try (BulkProcessor processor = BulkProcessor.builder(hlClient::bulkAsync, listener)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB))
.setBulkActions(nbItems + 1)
.build()) {
for (int i = 0; i < nbItems; i++) {
String id = String.valueOf(i);
boolean erroneous = randomBoolean();
@ -631,7 +629,7 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
if (opType == DocWriteRequest.OpType.DELETE) {
if (erroneous == false) {
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
DeleteRequest deleteRequest = new DeleteRequest("index", "test", id);
@ -653,10 +651,10 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
} else if (opType == DocWriteRequest.OpType.UPDATE) {
UpdateRequest updateRequest = new UpdateRequest("index", "test", id)
.doc(new IndexRequest().source(xContentType, "id", i));
.doc(new IndexRequest().source(xContentType, "id", i));
if (erroneous == false) {
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
@ -676,8 +674,6 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse bulkResponse, BulkRequest bulkRequest) {
@ -19,13 +19,11 @@
package org.elasticsearch.client.documentation;
import org.elasticsearch.Build;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
@ -40,7 +38,6 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
@ -49,9 +46,7 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -64,7 +59,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.Scheduler;
import java.io.IOException;
import java.util.Collections;
@ -868,31 +863,27 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
public void testBulkProcessor() throws InterruptedException, IOException {
Settings settings = Settings.builder().put("node.name", "my-application").build();
RestHighLevelClient client = highLevelClient();
// tag::bulk-processor-init
ThreadPool threadPool = new ThreadPool(settings); // <1>
BulkProcessor.Listener listener = new BulkProcessor.Listener() { // <2>
BulkProcessor.Listener listener = new BulkProcessor.Listener() { // <1>
public void beforeBulk(long executionId, BulkRequest request) {
// <3>
// <2>
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// <4>
// <3>
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// <5>
// <4>
BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
.build(); // <6>
BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build(); // <5>
// end::bulk-processor-init
@ -917,7 +908,6 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
// tag::bulk-processor-close
// end::bulk-processor-close
// tag::bulk-processor-listener
@ -944,19 +934,14 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
// end::bulk-processor-listener
ThreadPool threadPool = new ThreadPool(settings);
try {
// tag::bulk-processor-options
BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool);
builder.setBulkActions(500); // <1>
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2>
builder.setConcurrentRequests(0); // <3>
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // <4>
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // <5>
// end::bulk-processor-options
} finally {
// tag::bulk-processor-options
BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500); // <1>
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2>
builder.setConcurrentRequests(0); // <3>
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // <4>
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // <5>
// end::bulk-processor-options
@ -26,14 +26,17 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
@ -78,22 +81,20 @@ public class BulkProcessor implements Closeable {
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final Listener listener;
private final ThreadPool threadPool;
private final Scheduler scheduler;
private final Runnable onClose;
private int concurrentRequests = 1;
private int bulkActions = 1000;
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
private TimeValue flushInterval = null;
private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff();
* Creates a builder of bulk processor with the client to use and the listener that will be used
* to be notified on the completion of bulk requests.
public Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, ThreadPool threadPool) {
private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
Scheduler scheduler, Runnable onClose) {
this.consumer = consumer;
this.listener = listener;
this.threadPool = threadPool;
this.scheduler = scheduler;
this.onClose = onClose;
@ -155,39 +156,51 @@ public class BulkProcessor implements Closeable {
* Builds a new bulk processor.
public BulkProcessor build() {
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, threadPool);
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval,
scheduler, onClose);
public static Builder builder(Client client, Listener listener) {
Objects.requireNonNull(client, "client");
Objects.requireNonNull(listener, "listener");
return new Builder(client::bulk, listener, client.threadPool(), () -> {});
return new Builder(client::bulk, listener, client.threadPool());
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
Objects.requireNonNull(consumer, "consumer");
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
return new Builder(consumer, listener,
(delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
private final int bulkActions;
private final long bulkSize;
private final ThreadPool.Cancellable cancellableFlushTask;
private final Scheduler.Cancellable cancellableFlushTask;
private final AtomicLong executionIdGen = new AtomicLong();
private BulkRequest bulkRequest;
private final BulkRequestHandler bulkRequestHandler;
private final Scheduler scheduler;
private final Runnable onClose;
private volatile boolean closed = false;
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
ThreadPool threadPool) {
Scheduler scheduler, Runnable onClose) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, threadPool, concurrentRequests);
this.scheduler = scheduler;
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, threadPool);
this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
this.onClose = onClose;
@ -200,6 +213,7 @@ public class BulkProcessor implements Closeable {
} catch (InterruptedException exc) {
@ -289,9 +303,9 @@ public class BulkProcessor implements Closeable {
return this;
private ThreadPool.Cancellable startFlushTask(TimeValue flushInterval, ThreadPool threadPool) {
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
if (flushInterval == null) {
return new ThreadPool.Cancellable() {
return new Scheduler.Cancellable() {
public void cancel() {}
@ -301,9 +315,8 @@ public class BulkProcessor implements Closeable {
final Runnable flushRunnable = threadPool.getThreadContext().preserveContext(new Flush());
return threadPool.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
final Runnable flushRunnable = scheduler.preserveContext(new Flush());
return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
private void executeIfNeeded() {
@ -25,7 +25,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.Scheduler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@ -44,14 +44,13 @@ public final class BulkRequestHandler {
private final int concurrentRequests;
BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
BulkProcessor.Listener listener, ThreadPool threadPool,
int concurrentRequests) {
BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
assert concurrentRequests >= 0;
this.logger = Loggers.getLogger(getClass());
this.consumer = consumer;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, threadPool);
this.retry = new Retry(EsRejectedExecutionException.class, backoffPolicy, scheduler);
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
@ -26,6 +26,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
@ -41,13 +42,12 @@ import java.util.function.Predicate;
public class Retry {
private final Class<? extends Throwable> retryOnThrowable;
private final BackoffPolicy backoffPolicy;
private final ThreadPool threadPool;
private final Scheduler scheduler;
public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, ThreadPool threadPool) {
public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Scheduler scheduler) {
this.retryOnThrowable = retryOnThrowable;
this.backoffPolicy = backoffPolicy;
this.threadPool = threadPool;
this.scheduler = scheduler;
@ -58,8 +58,9 @@ public class Retry {
* @param listener A listener that is invoked when the bulk request finishes or completes with an exception. The listener is not
* @param settings settings
public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Settings settings) {
RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, threadPool);
public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
ActionListener<BulkResponse> listener, Settings settings) {
RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, scheduler);
@ -72,7 +73,8 @@ public class Retry {
* @param settings settings
* @return a future representing the bulk response returned by the client.
public PlainActionFuture<BulkResponse> withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, Settings settings) {
public PlainActionFuture<BulkResponse> withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
BulkRequest bulkRequest, Settings settings) {
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
withBackoff(consumer, bulkRequest, future, settings);
return future;
@ -80,7 +82,7 @@ public class Retry {
static class RetryHandler implements ActionListener<BulkResponse> {
private final Logger logger;
private final ThreadPool threadPool;
private final Scheduler scheduler;
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final ActionListener<BulkResponse> listener;
private final Iterator<TimeValue> backoff;
@ -95,13 +97,13 @@ public class Retry {
RetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy,
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, ActionListener<BulkResponse> listener,
Settings settings, ThreadPool threadPool) {
Settings settings, Scheduler scheduler) {
this.retryOnThrowable = retryOnThrowable;
this.backoff = backoffPolicy.iterator();
this.consumer = consumer;
this.listener = listener;
this.logger = Loggers.getLogger(getClass(), settings);
this.threadPool = threadPool;
this.scheduler = scheduler;
// in contrast to System.currentTimeMillis(), nanoTime() uses a monotonic clock under the hood
this.startTimestampNanos = System.nanoTime();
@ -136,8 +138,8 @@ public class Retry {
assert backoff.hasNext();
TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
Runnable command = threadPool.getThreadContext().preserveContext(() -> this.execute(bulkRequestForRetry));
scheduledRequestFuture = threadPool.schedule(next, ThreadPool.Names.SAME, command);
Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
@ -36,7 +36,7 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.io.Closeable;
@ -28,7 +28,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.jvm.JvmStats.GarbageCollector;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.util.HashMap;
@ -93,7 +93,7 @@ import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportRequest;
Normal file
Normal file
@ -0,0 +1,209 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.threadpool;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
* Scheduler that allows to schedule one-shot and periodic commands.
public interface Scheduler {
static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
return scheduler;
static boolean terminate(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, long timeout, TimeUnit timeUnit) {
if (awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit)) {
return true;
// last resort
return awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit);
static boolean awaitTermination(final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
final long timeout, final TimeUnit timeUnit) {
try {
if (scheduledThreadPoolExecutor.awaitTermination(timeout, timeUnit)) {
return true;
} catch (InterruptedException e) {
return false;
* Does nothing by default but can be used by subclasses to save the current thread context and wraps the command in a Runnable
* that restores that context before running the command.
default Runnable preserveContext(Runnable command) {
return command;
* Schedules a one-shot command to be run after a given delay. The command is not run in the context of the calling thread.
* To preserve the context of the calling thread you may call {@link #preserveContext(Runnable)} on the runnable before passing
* it to this method.
* The command runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow
* to execute on a different executor, in which case blocking calls are allowed.
* @param delay delay before the task executes
* @param executor the name of the executor that has to execute this task. Ignored in the default implementation but can be used
* by subclasses that support multiple executors.
* @param command the command to run
* @return a ScheduledFuture who's get will return when the task has been added to its target thread pool and throws an exception if
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
* the ScheduledFuture cannot interact with it.
* @throws EsRejectedExecutionException if the task cannot be scheduled for execution
ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command);
* Schedules a periodic action that runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow
* to execute on a different executor, in which case blocking calls are allowed.
* @param command the action to take
* @param interval the delay interval
* @param executor the name of the executor that has to execute this task. Ignored in the default implementation but can be used
* by subclasses that support multiple executors.
* @return a {@link Cancellable} that can be used to cancel the subsequent runs of the command. If the command is running, it will
* not be interrupted.
default Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) {
return new ReschedulingRunnable(command, interval, executor, this, (e) -> {}, (e) -> {});
* This interface represents an object whose execution may be cancelled during runtime.
interface Cancellable {
* Cancel the execution of this object. This method is idempotent.
void cancel();
* Check if the execution has been cancelled
* @return true if cancelled
boolean isCancelled();
* This class encapsulates the scheduling of a {@link Runnable} that needs to be repeated on a interval. For example, checking a value
* for cleanup every second could be done by passing in a Runnable that can perform the check and the specified interval between
* executions of this runnable. <em>NOTE:</em> the runnable is only rescheduled to run again after completion of the runnable.
* For this class, <i>completion</i> means that the call to {@link Runnable#run()} returned or an exception was thrown and caught. In
* case of an exception, this class will log the exception and reschedule the runnable for its next execution. This differs from the
* {@link ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} semantics as an exception there would
* terminate the rescheduling of the runnable.
final class ReschedulingRunnable extends AbstractRunnable implements Cancellable {
private final Runnable runnable;
private final TimeValue interval;
private final String executor;
private final Scheduler scheduler;
private final Consumer<Exception> rejectionConsumer;
private final Consumer<Exception> failureConsumer;
private volatile boolean run = true;
* Creates a new rescheduling runnable and schedules the first execution to occur after the interval specified
* @param runnable the {@link Runnable} that should be executed periodically
* @param interval the time interval between executions
* @param executor the executor where this runnable should be scheduled to run
* @param scheduler the {@link Scheduler} instance to use for scheduling
ReschedulingRunnable(Runnable runnable, TimeValue interval, String executor, Scheduler scheduler,
Consumer<Exception> rejectionConsumer, Consumer<Exception> failureConsumer) {
this.runnable = runnable;
this.interval = interval;
this.executor = executor;
this.scheduler = scheduler;
this.rejectionConsumer = rejectionConsumer;
this.failureConsumer = failureConsumer;
scheduler.schedule(interval, executor, this);
public void cancel() {
run = false;
public boolean isCancelled() {
return run == false;
public void doRun() {
// always check run here since this may have been cancelled since the last execution and we do not want to run
if (run) {
public void onFailure(Exception e) {
public void onRejection(Exception e) {
run = false;
public void onAfter() {
// if this has not been cancelled reschedule it to run again
if (run) {
try {
scheduler.schedule(interval, executor, this);
} catch (final EsRejectedExecutionException e) {
@ -33,10 +33,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
@ -64,7 +61,7 @@ import java.util.concurrent.TimeUnit;
import static java.util.Collections.unmodifiableMap;
public class ThreadPool extends AbstractComponent implements Closeable {
public class ThreadPool extends AbstractComponent implements Scheduler, Closeable {
public static class Names {
public static final String SAME = "same";
@ -143,8 +140,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
private Map<String, ExecutorHolder> executors = new HashMap<>();
private final ScheduledThreadPoolExecutor scheduler;
private final CachedTimeThread cachedTimeThread;
static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();
@ -153,6 +148,8 @@ public class ThreadPool extends AbstractComponent implements Closeable {
private final Map<String, ExecutorBuilder> builders;
private final ScheduledThreadPoolExecutor scheduler;
public Collection<ExecutorBuilder> builders() {
return Collections.unmodifiableCollection(builders.values());
@ -210,12 +207,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
this.executors = unmodifiableMap(executors);
this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
this.scheduler = Scheduler.initScheduler(settings);
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
@ -329,25 +321,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
return holder.executor();
public ScheduledExecutorService scheduler() {
return this.scheduler;
* Schedules a periodic action that runs on the specified thread pool.
* @param command the action to take
* @param interval the delay interval
* @param executor The name of the thread pool on which to execute this task. {@link Names#SAME} means "execute on the scheduler thread",
* which there is only one of. Executing blocking or long running code on the {@link Names#SAME} thread pool should never
* be done as it can cause issues with the cluster
* @return a {@link Cancellable} that can be used to cancel the subsequent runs of the command. If the command is running, it will
* not be interrupted.
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) {
return new ReschedulingRunnable(command, interval, executor, this);
* Schedules a one-shot command to run after a given delay. The command is not run in the context of the calling thread. To preserve the
* context of the calling thread you may call <code>threadPool.getThreadContext().preserveContext</code> on the runnable before passing
@ -361,13 +334,30 @@ public class ThreadPool extends AbstractComponent implements Closeable {
* @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
* the ScheduledFuture will cannot interact with it.
* @throws EsRejectedExecutionException if the task cannot be scheduled for execution
* @throws org.elasticsearch.common.util.concurrent.EsRejectedExecutionException if the task cannot be scheduled for execution
public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) {
if (!Names.SAME.equals(executor)) {
command = new ThreadedRunnable(command, executor(executor));
return scheduler.schedule(new LoggingRunnable(command), delay.millis(), TimeUnit.MILLISECONDS);
return scheduler.schedule(new ThreadPool.LoggingRunnable(command), delay.millis(), TimeUnit.MILLISECONDS);
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) {
return new ReschedulingRunnable(command, interval, executor, this,
(e) -> {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("scheduled task [{}] was rejected on thread pool [{}]",
command, executor), e);
(e) -> logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to run scheduled task [{}] on thread pool [{}]",
command, executor), e));
public Runnable preserveContext(Runnable command) {
return getThreadContext().preserveContext(command);
public void shutdown() {
@ -376,7 +366,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor.executor()).shutdown();
@ -387,7 +377,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) executor.executor()).shutdownNow();
@ -396,14 +386,17 @@ public class ThreadPool extends AbstractComponent implements Closeable {
boolean result = scheduler.awaitTermination(timeout, unit);
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
result &= ((ThreadPoolExecutor) executor.executor()).awaitTermination(timeout, unit);
result &= executor.executor().awaitTermination(timeout, unit);
return result;
public ScheduledExecutorService scheduler() {
return this.scheduler;
* Constrains a value between minimum and maximum values
* (inclusive).
@ -726,7 +719,9 @@ public class ThreadPool extends AbstractComponent implements Closeable {
if (pool != null) {
try {
if (awaitTermination(pool, timeout, timeUnit)) return true;
if (awaitTermination(pool, timeout, timeUnit)) {
return true;
// last resort
return awaitTermination(pool, timeout, timeUnit);
@ -738,11 +733,11 @@ public class ThreadPool extends AbstractComponent implements Closeable {
private static boolean awaitTermination(
final ThreadPool pool,
final ThreadPool threadPool,
final long timeout,
final TimeUnit timeUnit) {
try {
if (pool.awaitTermination(timeout, timeUnit)) {
if (threadPool.awaitTermination(timeout, timeUnit)) {
return true;
} catch (InterruptedException e) {
@ -760,102 +755,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
return threadContext;
* This interface represents an object whose execution may be cancelled during runtime.
public interface Cancellable {
* Cancel the execution of this object. This method is idempotent.
void cancel();
* Check if the execution has been cancelled
* @return true if cancelled
boolean isCancelled();
* This class encapsulates the scheduling of a {@link Runnable} that needs to be repeated on a interval. For example, checking a value
* for cleanup every second could be done by passing in a Runnable that can perform the check and the specified interval between
* executions of this runnable. <em>NOTE:</em> the runnable is only rescheduled to run again after completion of the runnable.
* For this class, <i>completion</i> means that the call to {@link Runnable#run()} returned or an exception was thrown and caught. In
* case of an exception, this class will log the exception and reschedule the runnable for its next execution. This differs from the
* {@link ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} semantics as an exception there would
* terminate the rescheduling of the runnable.
static final class ReschedulingRunnable extends AbstractRunnable implements Cancellable {
private final Runnable runnable;
private final TimeValue interval;
private final String executor;
private final ThreadPool threadPool;
private volatile boolean run = true;
* Creates a new rescheduling runnable and schedules the first execution to occur after the interval specified
* @param runnable the {@link Runnable} that should be executed periodically
* @param interval the time interval between executions
* @param executor the executor where this runnable should be scheduled to run
* @param threadPool the {@link ThreadPool} instance to use for scheduling
ReschedulingRunnable(Runnable runnable, TimeValue interval, String executor, ThreadPool threadPool) {
this.runnable = runnable;
this.interval = interval;
this.executor = executor;
this.threadPool = threadPool;
threadPool.schedule(interval, executor, this);
public void cancel() {
run = false;
public boolean isCancelled() {
return run == false;
public void doRun() {
// always check run here since this may have been cancelled since the last execution and we do not want to run
if (run) {
public void onFailure(Exception e) {
threadPool.logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to run scheduled task [{}] on thread pool [{}]", runnable.toString(), executor), e);
public void onRejection(Exception e) {
run = false;
if (threadPool.logger.isDebugEnabled()) {
threadPool.logger.debug((Supplier<?>) () -> new ParameterizedMessage("scheduled task [{}] was rejected on thread pool [{}]", runnable, executor), e);
public void onAfter() {
// if this has not been cancelled reschedule it to run again
if (run) {
try {
threadPool.schedule(interval, executor, this);
} catch (final EsRejectedExecutionException e) {
public static boolean assertNotScheduleThread(String reason) {
assert Thread.currentThread().getName().contains("scheduler") == false :
"Expected current thread [" + Thread.currentThread() + "] to not be the scheduler thread. Reason: [" + reason + "]";
@ -25,7 +25,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.io.IOException;
@ -67,7 +67,7 @@ public class BulkProcessorTests extends ESTestCase {
final BulkProcessor bulkProcessor;
try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
threadPool.getThreadContext().putHeader(headerKey, headerValue);
threadPool.getThreadContext().putTransient(transientKey, transientValue);
bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() {
@ -82,7 +82,7 @@ public class BulkProcessorTests extends ESTestCase {
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
}, 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, threadPool);
}, 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, threadPool, () -> {});
@ -56,7 +56,7 @@ import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.junit.After;
import org.junit.Before;
@ -35,7 +35,7 @@ import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import java.io.IOException;
import java.util.ArrayList;
@ -22,9 +22,9 @@ package org.elasticsearch.monitor.jvm;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import java.util.AbstractMap;
import java.util.HashSet;
@ -26,9 +26,9 @@ import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.threadpool.ThreadPool.ReschedulingRunnable;
import org.elasticsearch.threadpool.Scheduler.ReschedulingRunnable;
import org.junit.After;
import org.junit.Before;
@ -80,7 +80,8 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool);
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool,
(e) -> {}, (e) -> {});
// this call was made during construction of the runnable
verify(threadPool, times(1)).schedule(delay, Names.GENERIC, reschedulingRunnable);
@ -260,7 +261,8 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
Runnable runnable = () -> {};
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool);
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC,
threadPool, (e) -> {}, (e) -> {});
@ -125,27 +125,24 @@ The `BulkProcessor` simplifies the usage of the Bulk API by providing
a utility class that allows index/update/delete operations to be
transparently executed as they are added to the processor.
In order to execute the requests, the `BulkProcessor` requires 3 components:
In order to execute the requests, the `BulkProcessor` requires the following
`RestHighLevelClient`:: This client is used to execute the `BulkRequest`
and to retrieve the `BulkResponse`
`BulkProcessor.Listener`:: This listener is called before and after
every `BulkRequest` execution or when a `BulkRequest` failed
`ThreadPool`:: The `BulkRequest` executions are done using threads from this
pool, allowing the `BulkProcessor` to work in a non-blocking manner and to
accept new index/update/delete requests while bulk requests are executing.
Then the `BulkProcessor.Builder` class can be used to build a new `BulkProcessor`:
Then the `BulkProcessor.builder` method can be used to build a new `BulkProcessor`:
<1> Create the `ThreadPool` using the given `Settings`
<2> Create the `BulkProcessor.Listener`
<3> This method is called before each execution of a `BulkRequest`
<4> This method is called after each execution of a `BulkRequest`
<5> This method is called when a `BulkRequest` failed
<6> Create the `BulkProcessor` by calling the `build()` method from
<1> Create the `BulkProcessor.Listener`
<2> This method is called before each execution of a `BulkRequest`
<3> This method is called after each execution of a `BulkRequest`
<4> This method is called when a `BulkRequest` failed
<5> Create the `BulkProcessor` by calling the `build()` method from
the `BulkProcessor.Builder`. The `RestHighLevelClient.bulkAsync()`
method will be used to execute the `BulkRequest` under the hood.
@ -190,7 +187,7 @@ to know if the `BulkResponse` contains errors
the failure
Once all requests have been added to the `BulkProcessor`, its instance needs to
be closed closed using one of the two available closing methods.
be closed using one of the two available closing methods.
The `awaitClose()` method can be used to wait until all requests have been processed
or the specified waiting time elapses:
@ -209,3 +206,4 @@ include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-close]
Both methods flush the requests added to the processor before closing the processor
and also forbid any new request to be added to it.
@ -765,8 +765,8 @@ public abstract class ESTestCase extends LuceneTestCase {
return terminated;
public static boolean terminate(ThreadPool service) throws InterruptedException {
return ThreadPool.terminate(service, 10, TimeUnit.SECONDS);
public static boolean terminate(ThreadPool threadPool) throws InterruptedException {
return ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
Reference in New Issue
Block a user