Handle scheduler exceptions (#38014)

Scheduler.schedule(...) would previously assume that caller handles
exception by calling get() on the returned ScheduledFuture.
schedule() now returns a ScheduledCancellable that no longer gives
access to the exception. Instead, any exception thrown out of a
scheduled Runnable is logged as a warning.

This is a continuation of #28667, #36137 and also fixes #37708.
This commit is contained in:
Henning Andersen 2019-01-31 17:51:45 +01:00 committed by GitHub
parent 7f738e8541
commit 68ed72b923
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 618 additions and 261 deletions

View File

@ -20,10 +20,9 @@ package org.elasticsearch.grok;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction; import java.util.function.BiConsumer;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
/** /**
@ -68,7 +67,7 @@ public interface ThreadWatchdog {
static ThreadWatchdog newInstance(long interval, static ThreadWatchdog newInstance(long interval,
long maxExecutionTime, long maxExecutionTime,
LongSupplier relativeTimeSupplier, LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) { BiConsumer<Long, Runnable> scheduler) {
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler); return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
} }
@ -105,7 +104,7 @@ public interface ThreadWatchdog {
private final long interval; private final long interval;
private final long maxExecutionTime; private final long maxExecutionTime;
private final LongSupplier relativeTimeSupplier; private final LongSupplier relativeTimeSupplier;
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler; private final BiConsumer<Long, Runnable> scheduler;
private final AtomicInteger registered = new AtomicInteger(0); private final AtomicInteger registered = new AtomicInteger(0);
private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicBoolean running = new AtomicBoolean(false);
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>(); final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
@ -113,7 +112,7 @@ public interface ThreadWatchdog {
private Default(long interval, private Default(long interval,
long maxExecutionTime, long maxExecutionTime,
LongSupplier relativeTimeSupplier, LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) { BiConsumer<Long, Runnable> scheduler) {
this.interval = interval; this.interval = interval;
this.maxExecutionTime = maxExecutionTime; this.maxExecutionTime = maxExecutionTime;
this.relativeTimeSupplier = relativeTimeSupplier; this.relativeTimeSupplier = relativeTimeSupplier;
@ -124,7 +123,7 @@ public interface ThreadWatchdog {
registered.getAndIncrement(); registered.getAndIncrement();
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong()); Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
if (running.compareAndSet(false, true) == true) { if (running.compareAndSet(false, true) == true) {
scheduler.apply(interval, this::interruptLongRunningExecutions); scheduler.accept(interval, this::interruptLongRunningExecutions);
} }
assert previousValue == null; assert previousValue == null;
} }
@ -149,7 +148,7 @@ public interface ThreadWatchdog {
} }
} }
if (registered.get() > 0) { if (registered.get() > 0) {
scheduler.apply(interval, this::interruptLongRunningExecutions); scheduler.accept(interval, this::interruptLongRunningExecutions);
} else { } else {
running.set(false); running.set(false);
} }

View File

@ -27,9 +27,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction; import java.util.function.BiConsumer;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -418,7 +417,7 @@ public class GrokTests extends ESTestCase {
"Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}"; "Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}";
String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " + String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " +
"Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018"; "Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018";
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> { BiConsumer<Long, Runnable> scheduler = (delay, command) -> {
try { try {
Thread.sleep(delay); Thread.sleep(delay);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -430,7 +429,6 @@ public class GrokTests extends ESTestCase {
} }
}); });
t.start(); t.start();
return null;
}; };
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler)); Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine)); Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));

View File

@ -51,7 +51,6 @@ public class ThreadWatchdogTests extends ESTestCase {
} }
}); });
thread.start(); thread.start();
return null;
}); });
Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry; Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry;

View File

@ -111,7 +111,8 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) { private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis(); long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis();
long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis(); long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis();
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler); return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis,
parameters.relativeTimeSupplier, parameters.scheduler::apply);
} }
} }

View File

@ -215,7 +215,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
logger.trace( logger.trace(
(Supplier<?>) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e); (Supplier<?>) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
countSearchRetry.run(); countSearchRetry.run();
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this); threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME);
return; return;
} }
} }

View File

@ -93,7 +93,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -323,7 +322,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
worker.rethrottle(1); worker.rethrottle(1);
setupClient(new TestThreadPool(getTestName()) { setupClient(new TestThreadPool(getTestName()) {
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) { public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
// While we're here we can check that the sleep made it through // While we're here we can check that the sleep made it through
assertThat(delay.nanos(), greaterThan(0L)); assertThat(delay.nanos(), greaterThan(0L));
assertThat(delay.seconds(), lessThanOrEqualTo(10L)); assertThat(delay.seconds(), lessThanOrEqualTo(10L));
@ -442,7 +441,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
AtomicReference<Runnable> capturedCommand = new AtomicReference<>(); AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
setupClient(new TestThreadPool(getTestName()) { setupClient(new TestThreadPool(getTestName()) {
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) { public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
capturedDelay.set(delay); capturedDelay.set(delay);
capturedCommand.set(command); capturedCommand.set(command);
return null; return null;
@ -618,7 +617,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
*/ */
setupClient(new TestThreadPool(getTestName()) { setupClient(new TestThreadPool(getTestName()) {
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) { public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
/* /*
* This is called twice: * This is called twice:
* 1. To schedule the throttling. When that happens we immediately cancel the task. * 1. To schedule the throttling. When that happens we immediately cancel the task.
@ -629,7 +628,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
if (delay.nanos() > 0) { if (delay.nanos() > 0) {
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {})); generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
} }
return super.schedule(delay, name, command); return super.schedule(command, delay, name);
} }
}); });

View File

@ -69,7 +69,6 @@ import java.net.URL;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -104,7 +103,7 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
} }
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) { public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
command.run(); command.run();
return null; return null;
} }

View File

@ -19,6 +19,11 @@
package org.elasticsearch.threadpool; package org.elasticsearch.threadpool;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -26,6 +31,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -38,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
@ -108,7 +115,12 @@ public class EvilThreadPoolTests extends ESTestCase {
try { try {
checkExecutionError(getExecuteRunner(prioritizedExecutor)); checkExecutionError(getExecuteRunner(prioritizedExecutor));
checkExecutionError(getSubmitRunner(prioritizedExecutor)); checkExecutionError(getSubmitRunner(prioritizedExecutor));
// bias towards timeout
checkExecutionError(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r));
// race whether timeout or success (but typically biased towards success)
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r)); checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r));
// bias towards no timeout.
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r));
} finally { } finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS); ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
} }
@ -170,10 +182,7 @@ public class EvilThreadPoolTests extends ESTestCase {
final boolean expectExceptionOnSchedule = final boolean expectExceptionOnSchedule =
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener // TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
// scheduler just swallows the exception here
// TODO: bubble these exceptions up
&& ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT;
checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule); checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule);
} }
} }
@ -219,14 +228,19 @@ public class EvilThreadPoolTests extends ESTestCase {
} }
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37708")
public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException { public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test", final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler()); EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
try { try {
checkExecutionException(getExecuteRunner(prioritizedExecutor), true); checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
checkExecutionException(getSubmitRunner(prioritizedExecutor), false); checkExecutionException(getSubmitRunner(prioritizedExecutor), false);
// bias towards timeout
checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true);
// race whether timeout or success (but typically biased towards success)
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true); checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true);
// bias towards no timeout.
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true);
} finally { } finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS); ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
} }
@ -235,26 +249,39 @@ public class EvilThreadPoolTests extends ESTestCase {
public void testExecutionExceptionOnScheduler() throws InterruptedException { public void testExecutionExceptionOnScheduler() throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
try { try {
// scheduler just swallows the exceptions checkExecutionException(getExecuteRunner(scheduler), true);
// TODO: bubble these exceptions up // while submit does return a Future, we choose to log exceptions anyway,
checkExecutionException(getExecuteRunner(scheduler), false); // since this is the semi-internal SafeScheduledThreadPoolExecutor that is being used,
checkExecutionException(getSubmitRunner(scheduler), false); // which also logs exceptions for schedule calls.
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false); checkExecutionException(getSubmitRunner(scheduler), true);
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), true);
} finally { } finally {
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS); Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
} }
} }
private Runnable delayMillis(Runnable r, int ms) {
return () -> {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
r.run();
};
}
private void checkExecutionException(Consumer<Runnable> runner, boolean expectException) throws InterruptedException { private void checkExecutionException(Consumer<Runnable> runner, boolean expectException) throws InterruptedException {
logger.info("checking exception for {}", runner);
final Runnable runnable; final Runnable runnable;
final boolean willThrow; final boolean willThrow;
if (randomBoolean()) { if (randomBoolean()) {
logger.info("checking direct exception for {}", runner);
runnable = () -> { runnable = () -> {
throw new IllegalStateException("future exception"); throw new IllegalStateException("future exception");
}; };
willThrow = expectException; willThrow = expectException;
} else { } else {
logger.info("checking abstract runnable exception for {}", runner);
runnable = new AbstractRunnable() { runnable = new AbstractRunnable() {
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
@ -275,6 +302,7 @@ public class EvilThreadPoolTests extends ESTestCase {
o -> { o -> {
assertEquals(willThrow, o.isPresent()); assertEquals(willThrow, o.isPresent());
if (willThrow) { if (willThrow) {
if (o.get() instanceof Error) throw (Error) o.get();
assertThat(o.get(), instanceOf(IllegalStateException.class)); assertThat(o.get(), instanceOf(IllegalStateException.class));
assertThat(o.get(), hasToString(containsString("future exception"))); assertThat(o.get(), hasToString(containsString("future exception")));
} }
@ -313,7 +341,7 @@ public class EvilThreadPoolTests extends ESTestCase {
return new Consumer<Runnable>() { return new Consumer<Runnable>() {
@Override @Override
public void accept(Runnable runnable) { public void accept(Runnable runnable) {
threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable); threadPool.schedule(runnable, randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor);
} }
@Override @Override
@ -335,20 +363,49 @@ public class EvilThreadPoolTests extends ESTestCase {
try { try {
Thread.setDefaultUncaughtExceptionHandler((t, e) -> { Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
assertTrue(expectThrowable); assertTrue(expectThrowable);
throwableReference.set(e); assertTrue("Only one message allowed", throwableReference.compareAndSet(null, e));
uncaughtExceptionHandlerLatch.countDown(); uncaughtExceptionHandlerLatch.countDown();
}); });
final CountDownLatch supplierLatch = new CountDownLatch(1); final CountDownLatch supplierLatch = new CountDownLatch(1);
try { Runnable job = () -> {
runner.accept(() -> {
try { try {
runnable.run(); runnable.run();
} finally { } finally {
supplierLatch.countDown(); supplierLatch.countDown();
} }
};
// snoop on logging to also handle the cases where exceptions are simply logged in Scheduler.
final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class);
final MockLogAppender appender = new MockLogAppender();
appender.addExpectation(
new MockLogAppender.LoggingExpectation() {
@Override
public void match(LogEvent event) {
if (event.getLevel() == Level.WARN) {
assertThat("no other warnings than those expected",
event.getMessage().getFormattedMessage(),
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
assertTrue(expectThrowable);
assertNotNull(event.getThrown());
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
uncaughtExceptionHandlerLatch.countDown();
}
}
@Override
public void assertMatched() {
}
}); });
appender.start();
Loggers.addAppender(schedulerLogger, appender);
try {
try {
runner.accept(job);
} catch (Throwable t) { } catch (Throwable t) {
consumer.accept(Optional.of(t)); consumer.accept(Optional.of(t));
return; return;
@ -359,7 +416,14 @@ public class EvilThreadPoolTests extends ESTestCase {
if (expectThrowable) { if (expectThrowable) {
uncaughtExceptionHandlerLatch.await(); uncaughtExceptionHandlerLatch.await();
} }
} finally {
Loggers.removeAppender(schedulerLogger, appender);
appender.stop();
}
consumer.accept(Optional.ofNullable(throwableReference.get())); consumer.accept(Optional.ofNullable(throwableReference.get()));
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} finally { } finally {
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
} }

View File

@ -203,10 +203,15 @@ public class BulkProcessor implements Closeable {
Objects.requireNonNull(listener, "listener"); Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
return new Builder(consumer, listener, return new Builder(consumer, listener,
(delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS), buildScheduler(scheduledThreadPoolExecutor),
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
} }
private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
return (command, delay, executor) ->
Scheduler.wrapAsScheduledCancellable(scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
}
private final int bulkActions; private final int bulkActions;
private final long bulkSize; private final long bulkSize;
@ -343,7 +348,9 @@ public class BulkProcessor implements Closeable {
if (flushInterval == null) { if (flushInterval == null) {
return new Scheduler.Cancellable() { return new Scheduler.Cancellable() {
@Override @Override
public void cancel() {} public boolean cancel() {
return false;
}
@Override @Override
public boolean isCancelled() { public boolean isCancelled() {

View File

@ -23,7 +23,6 @@ import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -31,7 +30,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -89,7 +87,7 @@ public class Retry {
// needed to construct the next bulk request based on the response to the previous one // needed to construct the next bulk request based on the response to the previous one
// volatile as we're called from a scheduled thread // volatile as we're called from a scheduled thread
private volatile BulkRequest currentBulkRequest; private volatile BulkRequest currentBulkRequest;
private volatile ScheduledFuture<?> scheduledRequestFuture; private volatile Scheduler.Cancellable retryCancellable;
RetryHandler(BackoffPolicy backoffPolicy, BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, RetryHandler(BackoffPolicy backoffPolicy, BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
ActionListener<BulkResponse> listener, Scheduler scheduler) { ActionListener<BulkResponse> listener, Scheduler scheduler) {
@ -123,7 +121,9 @@ public class Retry {
try { try {
listener.onFailure(e); listener.onFailure(e);
} finally { } finally {
FutureUtils.cancel(scheduledRequestFuture); if (retryCancellable != null) {
retryCancellable.cancel();
}
} }
} }
@ -132,7 +132,7 @@ public class Retry {
TimeValue next = backoff.next(); TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis()); logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry)); Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command); retryCancellable = scheduler.schedule(command, next, ThreadPool.Names.SAME);
} }
private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) { private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
@ -166,7 +166,9 @@ public class Retry {
try { try {
listener.onResponse(getAccumulatedResponse()); listener.onResponse(getAccumulatedResponse());
} finally { } finally {
FutureUtils.cancel(scheduledRequestFuture); if (retryCancellable != null) {
retryCancellable.cancel();
}
} }
} }

View File

@ -42,7 +42,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.ConnectionProfile;
@ -68,7 +68,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
final class TransportClientNodesService implements Closeable { final class TransportClientNodesService implements Closeable {
@ -100,7 +99,7 @@ final class TransportClientNodesService implements Closeable {
private final NodeSampler nodesSampler; private final NodeSampler nodesSampler;
private volatile ScheduledFuture nodesSamplerFuture; private volatile Scheduler.Cancellable nodesSamplerCancellable;
private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt()); private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt());
@ -146,7 +145,7 @@ final class TransportClientNodesService implements Closeable {
this.nodesSampler = new SimpleNodeSampler(); this.nodesSampler = new SimpleNodeSampler();
} }
this.hostFailureListener = hostFailureListener; this.hostFailureListener = hostFailureListener;
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler()); this.nodesSamplerCancellable = threadPool.schedule(new ScheduledNodeSampler(), nodesSamplerInterval, ThreadPool.Names.GENERIC);
} }
public List<TransportAddress> transportAddresses() { public List<TransportAddress> transportAddresses() {
@ -325,7 +324,9 @@ final class TransportClientNodesService implements Closeable {
return; return;
} }
closed = true; closed = true;
FutureUtils.cancel(nodesSamplerFuture); if (nodesSamplerCancellable != null) {
nodesSamplerCancellable.cancel();
}
for (DiscoveryNode node : nodes) { for (DiscoveryNode node : nodes) {
transportService.disconnectFromNode(node); transportService.disconnectFromNode(node);
} }
@ -392,7 +393,7 @@ final class TransportClientNodesService implements Closeable {
try { try {
nodesSampler.sample(); nodesSampler.sample();
if (!closed) { if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this); nodesSamplerCancellable = threadPool.schedule(this, nodesSamplerInterval, ThreadPool.Names.GENERIC);
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn("failed to sample", e); logger.warn("failed to sample", e);

View File

@ -31,10 +31,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.discovery.zen.MasterFaultDetection; import org.elasticsearch.discovery.zen.MasterFaultDetection;
import org.elasticsearch.discovery.zen.NodesFaultDetection; import org.elasticsearch.discovery.zen.NodesFaultDetection;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -42,7 +42,6 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.common.settings.Setting.Property; import static org.elasticsearch.common.settings.Setting.Property;
import static org.elasticsearch.common.settings.Setting.positiveTimeSetting; import static org.elasticsearch.common.settings.Setting.positiveTimeSetting;
@ -71,7 +70,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
private final TimeValue reconnectInterval; private final TimeValue reconnectInterval;
private volatile ScheduledFuture<?> backgroundFuture = null; private volatile Scheduler.Cancellable backgroundCancellable = null;
@Inject @Inject
public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) { public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
@ -187,19 +186,21 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
@Override @Override
public void onAfter() { public void onAfter() {
if (lifecycle.started()) { if (lifecycle.started()) {
backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this); backgroundCancellable = threadPool.schedule(this, reconnectInterval, ThreadPool.Names.GENERIC);
} }
} }
} }
@Override @Override
protected void doStart() { protected void doStart() {
backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ConnectionChecker()); backgroundCancellable = threadPool.schedule(new ConnectionChecker(), reconnectInterval, ThreadPool.Names.GENERIC);
} }
@Override @Override
protected void doStop() { protected void doStop() {
FutureUtils.cancel(backgroundFuture); if (backgroundCancellable != null) {
backgroundCancellable.cancel();
}
} }
@Override @Override

View File

@ -976,7 +976,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
new ListenableFuture<>(), ackListener, publishListener); new ListenableFuture<>(), ackListener, publishListener);
currentPublication = Optional.of(publication); currentPublication = Optional.of(publication);
transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() { transportService.getThreadPool().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
synchronized (mutex) { synchronized (mutex) {
@ -988,7 +988,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
public String toString() { public String toString() {
return "scheduled timeout for " + publication; return "scheduled timeout for " + publication;
} }
}); }, publishTimeout, Names.GENERIC);
final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes(); final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
leaderChecker.setCurrentNodes(publishNodes); leaderChecker.setCurrentNodes(publishNodes);

View File

@ -392,7 +392,7 @@ public class FollowersChecker {
} }
private void scheduleNextWakeUp() { private void scheduleNextWakeUp() {
transportService.getThreadPool().schedule(followerCheckInterval, Names.SAME, new Runnable() { transportService.getThreadPool().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
handleWakeUp(); handleWakeUp();
@ -402,7 +402,7 @@ public class FollowersChecker {
public String toString() { public String toString() {
return FollowerChecker.this + "::handleWakeUp"; return FollowerChecker.this + "::handleWakeUp";
} }
}); }, followerCheckInterval, Names.SAME);
} }
@Override @Override

View File

@ -301,7 +301,7 @@ public class LeaderChecker {
private void scheduleNextWakeUp() { private void scheduleNextWakeUp() {
logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval); logger.trace("scheduling next check of {} for [{}] = {}", leader, LEADER_CHECK_INTERVAL_SETTING.getKey(), leaderCheckInterval);
transportService.getThreadPool().schedule(leaderCheckInterval, Names.SAME, new Runnable() { transportService.getThreadPool().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
handleWakeUp(); handleWakeUp();
@ -311,7 +311,7 @@ public class LeaderChecker {
public String toString() { public String toString() {
return "scheduled check of leader " + leader; return "scheduled check of leader " + leader;
} }
}); }, leaderCheckInterval, Names.SAME);
} }
} }

View File

@ -32,10 +32,9 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -69,7 +68,7 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme
class DelayedRerouteTask extends ClusterStateUpdateTask { class DelayedRerouteTask extends ClusterStateUpdateTask {
final TimeValue nextDelay; // delay until submitting the reroute command final TimeValue nextDelay; // delay until submitting the reroute command
final long baseTimestampNanos; // timestamp (in nanos) upon which delay was calculated final long baseTimestampNanos; // timestamp (in nanos) upon which delay was calculated
volatile ScheduledFuture<?> future; volatile Scheduler.Cancellable cancellable;
final AtomicBoolean cancelScheduling = new AtomicBoolean(); final AtomicBoolean cancelScheduling = new AtomicBoolean();
DelayedRerouteTask(TimeValue nextDelay, long baseTimestampNanos) { DelayedRerouteTask(TimeValue nextDelay, long baseTimestampNanos) {
@ -83,12 +82,14 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme
public void cancelScheduling() { public void cancelScheduling() {
cancelScheduling.set(true); cancelScheduling.set(true);
FutureUtils.cancel(future); if (cancellable != null) {
cancellable.cancel();
}
removeIfSameTask(this); removeIfSameTask(this);
} }
public void schedule() { public void schedule() {
future = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() { cancellable = threadPool.schedule(new AbstractRunnable() {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
if (cancelScheduling.get()) { if (cancelScheduling.get()) {
@ -102,7 +103,7 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme
logger.warn("failed to submit schedule/execute reroute post unassigned shard", e); logger.warn("failed to submit schedule/execute reroute post unassigned shard", e);
removeIfSameTask(DelayedRerouteTask.this); removeIfSameTask(DelayedRerouteTask.this);
} }
}); }, nextDelay, ThreadPool.Names.SAME);
} }
@Override @Override

View File

@ -42,9 +42,9 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection; import java.util.Collection;
@ -55,7 +55,6 @@ import java.util.Objects;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -281,7 +280,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
public void run() { public void run() {
if (timeout != null) { if (timeout != null) {
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout); notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, ThreadPool.Names.GENERIC);
onGoingTimeouts.add(notifyTimeout); onGoingTimeouts.add(notifyTimeout);
} }
timeoutClusterStateListeners.add(listener); timeoutClusterStateListeners.add(listener);
@ -526,7 +525,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
class NotifyTimeout implements Runnable { class NotifyTimeout implements Runnable {
final TimeoutClusterStateListener listener; final TimeoutClusterStateListener listener;
final TimeValue timeout; final TimeValue timeout;
volatile ScheduledFuture future; volatile Scheduler.Cancellable cancellable;
NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) { NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) {
this.listener = listener; this.listener = listener;
@ -534,12 +533,14 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
} }
public void cancel() { public void cancel() {
FutureUtils.cancel(future); if (cancellable != null) {
cancellable.cancel();
}
} }
@Override @Override
public void run() { public void run() {
if (future != null && future.isCancelled()) { if (cancellable != null && cancellable.isCancelled()) {
return; return;
} }
if (lifecycle.stoppedOrClosed()) { if (lifecycle.stoppedOrClosed()) {

View File

@ -52,6 +52,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher; import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays; import java.util.Arrays;
@ -60,7 +61,6 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -606,7 +606,7 @@ public class MasterService extends AbstractLifecycleComponent {
private final DiscoveryNode masterNode; private final DiscoveryNode masterNode;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final long clusterStateVersion; private final long clusterStateVersion;
private volatile Future<?> ackTimeoutCallback; private volatile Scheduler.Cancellable ackTimeoutCallback;
private Exception lastFailure; private Exception lastFailure;
AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes, AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes,
@ -638,10 +638,10 @@ public class MasterService extends AbstractLifecycleComponent {
} else if (countDown.countDown()) { } else if (countDown.countDown()) {
finish(); finish();
} else { } else {
this.ackTimeoutCallback = threadPool.schedule(timeLeft, ThreadPool.Names.GENERIC, this::onTimeout); this.ackTimeoutCallback = threadPool.schedule(this::onTimeout, timeLeft, ThreadPool.Names.GENERIC);
// re-check if onNodeAck has not completed while we were scheduling the timeout // re-check if onNodeAck has not completed while we were scheduling the timeout
if (countDown.isCountedDown()) { if (countDown.isCountedDown()) {
FutureUtils.cancel(ackTimeoutCallback); ackTimeoutCallback.cancel();
} }
} }
} }
@ -666,7 +666,9 @@ public class MasterService extends AbstractLifecycleComponent {
private void finish() { private void finish() {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion); logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
FutureUtils.cancel(ackTimeoutCallback); if (ackTimeoutCallback != null) {
ackTimeoutCallback.cancel();
}
ackedTaskListener.onAllNodesAcked(lastFailure); ackedTaskListener.onAllNodesAcked(lastFailure);
} }

View File

@ -21,11 +21,11 @@ package org.elasticsearch.common.util.concurrent;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable; import java.io.Closeable;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -37,7 +37,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
private final boolean autoReschedule; private final boolean autoReschedule;
private volatile ScheduledFuture<?> scheduledFuture; private volatile Scheduler.Cancellable cancellable;
private volatile boolean isScheduledOrRunning; private volatile boolean isScheduledOrRunning;
private volatile Exception lastThrownException; private volatile Exception lastThrownException;
private volatile TimeValue interval; private volatile TimeValue interval;
@ -56,7 +56,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
*/ */
public synchronized void setInterval(TimeValue interval) { public synchronized void setInterval(TimeValue interval) {
this.interval = interval; this.interval = interval;
if (scheduledFuture != null) { if (cancellable != null) {
rescheduleIfNecessary(); rescheduleIfNecessary();
} }
} }
@ -84,18 +84,18 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
if (isClosed()) { if (isClosed()) {
return; return;
} }
if (scheduledFuture != null) { if (cancellable != null) {
FutureUtils.cancel(scheduledFuture); cancellable.cancel();
} }
if (interval.millis() > 0 && mustReschedule()) { if (interval.millis() > 0 && mustReschedule()) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("scheduling {} every {}", toString(), interval); logger.trace("scheduling {} every {}", toString(), interval);
} }
scheduledFuture = threadPool.schedule(interval, getThreadPool(), this); cancellable = threadPool.schedule(this, interval, getThreadPool());
isScheduledOrRunning = true; isScheduledOrRunning = true;
} else { } else {
logger.trace("scheduled {} disabled", toString()); logger.trace("scheduled {} disabled", toString());
scheduledFuture = null; cancellable = null;
isScheduledOrRunning = false; isScheduledOrRunning = false;
} }
} }
@ -110,8 +110,10 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
* Cancel any scheduled run, but do not prevent subsequent restarts. * Cancel any scheduled run, but do not prevent subsequent restarts.
*/ */
public synchronized void cancel() { public synchronized void cancel() {
FutureUtils.cancel(scheduledFuture); if (cancellable != null) {
scheduledFuture = null; cancellable.cancel();
cancellable = null;
}
isScheduledOrRunning = false; isScheduledOrRunning = false;
} }
@ -132,7 +134,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
@Override @Override
public final void run() { public final void run() {
synchronized (this) { synchronized (this) {
scheduledFuture = null; cancellable = null;
isScheduledOrRunning = autoReschedule; isScheduledOrRunning = autoReschedule;
} }
try { try {

View File

@ -118,8 +118,11 @@ public class EsExecutors {
* Checks if the runnable arose from asynchronous submission of a task to an executor. If an uncaught exception was thrown * Checks if the runnable arose from asynchronous submission of a task to an executor. If an uncaught exception was thrown
* during the execution of this task, we need to inspect this runnable and see if it is an error that should be propagated * during the execution of this task, we need to inspect this runnable and see if it is an error that should be propagated
* to the uncaught exception handler. * to the uncaught exception handler.
*
* @param runnable the runnable to inspect, should be a RunnableFuture
* @return non fatal exception or null if no exception.
*/ */
public static void rethrowErrors(Runnable runnable) { public static Throwable rethrowErrors(Runnable runnable) {
if (runnable instanceof RunnableFuture) { if (runnable instanceof RunnableFuture) {
try { try {
((RunnableFuture) runnable).get(); ((RunnableFuture) runnable).get();
@ -143,10 +146,15 @@ public class EsExecutors {
// restore the interrupt status // restore the interrupt status
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
if (e instanceof ExecutionException) {
return e.getCause();
} }
} }
} }
return null;
}
private static final class DirectExecutorService extends AbstractExecutorService { private static final class DirectExecutorService extends AbstractExecutorService {
@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")

View File

@ -128,7 +128,7 @@ public class MasterFaultDetection extends FaultDetection {
this.masterPinger = new MasterPinger(); this.masterPinger = new MasterPinger();
// we start pinging slightly later to allow the chosen master to complete it's own master election // we start pinging slightly later to allow the chosen master to complete it's own master election
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger); threadPool.schedule(masterPinger, pingInterval, ThreadPool.Names.SAME);
} }
public void stop(String reason) { public void stop(String reason) {
@ -174,7 +174,7 @@ public class MasterFaultDetection extends FaultDetection {
} }
this.masterPinger = new MasterPinger(); this.masterPinger = new MasterPinger();
// we use schedule with a 0 time value to run the pinger on the pool as it will run on later // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger); threadPool.schedule(masterPinger, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME);
} catch (Exception e) { } catch (Exception e) {
logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode); logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)"); notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)");
@ -218,7 +218,7 @@ public class MasterFaultDetection extends FaultDetection {
final DiscoveryNode masterToPing = masterNode; final DiscoveryNode masterToPing = masterNode;
if (masterToPing == null) { if (masterToPing == null) {
// master is null, should not happen, but we are still running, so reschedule // master is null, should not happen, but we are still running, so reschedule
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);
return; return;
} }
@ -243,7 +243,7 @@ public class MasterFaultDetection extends FaultDetection {
// check if the master node did not get switched on us..., if it did, we simply return with no reschedule // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
// we don't stop on disconnection from master, we keep pinging it // we don't stop on disconnection from master, we keep pinging it
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);
} }
} }

View File

@ -132,7 +132,7 @@ public class NodesFaultDetection extends FaultDetection {
// it's OK to overwrite an existing nodeFD - it will just stop and the new one will pick things up. // it's OK to overwrite an existing nodeFD - it will just stop and the new one will pick things up.
nodesFD.put(node, fd); nodesFD.put(node, fd);
// we use schedule with a 0 time value to run the pinger on the pool as it will run on later // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd); threadPool.schedule(fd, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME);
} }
} }
} }
@ -161,7 +161,7 @@ public class NodesFaultDetection extends FaultDetection {
transportService.connectToNode(node); transportService.connectToNode(node);
nodesFD.put(node, fd); nodesFD.put(node, fd);
// we use schedule with a 0 time value to run the pinger on the pool as it will run on later // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd); threadPool.schedule(fd, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME);
} catch (Exception e) { } catch (Exception e) {
logger.trace("[node ] [{}] transport disconnected (with verified connect)", node); logger.trace("[node ] [{}] transport disconnected (with verified connect)", node);
// clean up if needed, just to be safe.. // clean up if needed, just to be safe..
@ -240,7 +240,7 @@ public class NodesFaultDetection extends FaultDetection {
return; return;
} }
retryCount = 0; retryCount = 0;
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, NodeFD.this); threadPool.schedule(NodeFD.this, pingInterval, ThreadPool.Names.SAME);
} }
@Override @Override

View File

@ -304,9 +304,9 @@ public class UnicastZenPing implements ZenPing {
} }
}; };
threadPool.generic().execute(pingSender); threadPool.generic().execute(pingSender);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender); threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender); threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC);
threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() { threadPool.schedule(new AbstractRunnable() {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
finishPingingRound(pingingRound); finishPingingRound(pingingRound);
@ -316,7 +316,7 @@ public class UnicastZenPing implements ZenPing {
public void onFailure(Exception e) { public void onFailure(Exception e) {
logger.warn("unexpected error while finishing pinging round", e); logger.warn("unexpected error while finishing pinging round", e);
} }
}); }, scheduleDuration, ThreadPool.Names.GENERIC);
} }
// for testing // for testing
@ -557,8 +557,8 @@ public class UnicastZenPing implements ZenPing {
temporalResponses.add(request.pingResponse); temporalResponses.add(request.pingResponse);
// add to any ongoing pinging // add to any ongoing pinging
activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse)); activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse));
threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, threadPool.schedule(() -> temporalResponses.remove(request.pingResponse),
() -> temporalResponses.remove(request.pingResponse)); TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME);
List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses); List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);
pingResponses.add(createPingResponse(contextProvider.clusterState())); pingResponses.add(createPingResponse(contextProvider.clusterState()));

View File

@ -206,12 +206,12 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
if (enforceRecoverAfterTime && recoverAfterTime != null) { if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) { if (scheduledRecovery.compareAndSet(false, true)) {
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason); logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(recoverAfterTime, ThreadPool.Names.GENERIC, () -> { threadPool.schedule(() -> {
if (recovered.compareAndSet(false, true)) { if (recovered.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime); logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
recoveryRunnable.run(); recoveryRunnable.run();
} }
}); }, recoverAfterTime, ThreadPool.Names.GENERIC);
} }
} else { } else {
if (recovered.compareAndSet(false, true)) { if (recovered.compareAndSet(false, true)) {

View File

@ -155,7 +155,7 @@ public class ClientScrollableHitSource extends ScrollableHitSource {
TimeValue delay = retries.next(); TimeValue delay = retries.next();
logger.trace(() -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e); logger.trace(() -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
countSearchRetry.run(); countSearchRetry.run();
threadPool.schedule(delay, ThreadPool.Names.SAME, retryWithContext); threadPool.schedule(retryWithContext, delay, ThreadPool.Names.SAME);
} else { } else {
logger.warn(() -> new ParameterizedMessage( logger.warn(() -> new ParameterizedMessage(
"giving up on search because we retried [{}] times without success", retryCount), e); "giving up on search because we retried [{}] times without success", retryCount), e);

View File

@ -24,11 +24,10 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -173,10 +172,10 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
if (delayed == null) { if (delayed == null) {
return timeValueNanos(0); return timeValueNanos(0);
} }
if (delayed.future == null) { if (delayed.scheduled == null) {
return timeValueNanos(0); return timeValueNanos(0);
} }
return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS))); return timeValueNanos(max(0, delayed.scheduled.getDelay(TimeUnit.NANOSECONDS)));
} }
/** /**
@ -249,16 +248,16 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final Runnable command; private final Runnable command;
private final float requestsPerSecond; private final float requestsPerSecond;
private final ScheduledFuture<?> future; private final Scheduler.ScheduledCancellable scheduled;
DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, Runnable command) { DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, Runnable command) {
this.threadPool = threadPool; this.threadPool = threadPool;
this.requestsPerSecond = requestsPerSecond; this.requestsPerSecond = requestsPerSecond;
this.command = command; this.command = command;
this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, () -> { this.scheduled = threadPool.schedule(() -> {
throttledNanos.addAndGet(delay.nanos()); throttledNanos.addAndGet(delay.nanos());
command.run(); command.run();
}); }, delay, ThreadPool.Names.GENERIC);
} }
DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) { DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) {
@ -272,9 +271,9 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
return this; return this;
} }
long remainingDelay = future.getDelay(TimeUnit.NANOSECONDS); long remainingDelay = scheduled.getDelay(TimeUnit.NANOSECONDS);
// Actually reschedule the task // Actually reschedule the task
if (false == FutureUtils.cancel(future)) { if (scheduled == null || false == scheduled.cancel()) {
// Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here. // Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here.
logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", task.getId()); logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", task.getId());
return this; return this;

View File

@ -198,7 +198,7 @@ public class IndicesService extends AbstractLifecycleComponent
@Override @Override
protected void doStart() { protected void doStart() {
// Start thread that will manage cleaning the field data cache periodically // Start thread that will manage cleaning the field data cache periodically
threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, this.cacheCleaner); threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME);
} }
public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, NamedXContentRegistry xContentRegistry, public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, NamedXContentRegistry xContentRegistry,
@ -1164,7 +1164,7 @@ public class IndicesService extends AbstractLifecycleComponent
} }
// Reschedule itself to run again if not closed // Reschedule itself to run again if not closed
if (closed.get() == false) { if (closed.get() == false) {
threadPool.schedule(interval, ThreadPool.Names.SAME, this); threadPool.schedule(this, interval, ThreadPool.Names.SAME);
} }
} }

View File

@ -164,7 +164,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) { private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) {
RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout); RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout);
if (newTarget != null) { if (newTarget != null) {
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId())); threadPool.schedule(new RecoveryRunner(newTarget.recoveryId()), retryAfter, ThreadPool.Names.GENERIC);
} }
} }

View File

@ -76,8 +76,8 @@ public class RecoveriesCollection {
assert existingTarget == null : "found two RecoveryStatus instances with the same id"; assert existingTarget == null : "found two RecoveryStatus instances with the same id";
logger.trace("{} started recovery from {}, id [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode(), logger.trace("{} started recovery from {}, id [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode(),
recoveryTarget.recoveryId()); recoveryTarget.recoveryId());
threadPool.schedule(activityTimeout, ThreadPool.Names.GENERIC, threadPool.schedule(new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout),
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout)); activityTimeout, ThreadPool.Names.GENERIC);
} }
/** /**
@ -289,7 +289,7 @@ public class RecoveriesCollection {
} }
lastSeenAccessTime = accessTime; lastSeenAccessTime = accessTime;
logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", recoveryId, lastSeenAccessTime); logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", recoveryId, lastSeenAccessTime);
threadPool.schedule(checkInterval, ThreadPool.Names.GENERIC, this); threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC);
} }
} }

View File

@ -95,7 +95,7 @@ public class IngestService implements ClusterStateApplier {
env, scriptService, analysisRegistry, env, scriptService, analysisRegistry,
threadPool.getThreadContext(), threadPool::relativeTimeInMillis, threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule( (delay, command) -> threadPool.schedule(
TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
), this ), this
) )
); );

View File

@ -23,9 +23,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
@ -105,10 +105,10 @@ public interface Processor {
/** /**
* Provides scheduler support * Provides scheduler support
*/ */
public final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler; public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler, LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService) { IngestService ingestService) {
this.env = env; this.env = env;
this.scriptService = scriptService; this.scriptService = scriptService;

View File

@ -190,7 +190,7 @@ public class TaskResultsService {
} else { } else {
TimeValue wait = backoff.next(); TimeValue wait = backoff.next();
logger.warn(() -> new ParameterizedMessage("failed to store task result, retrying in [{}]", wait), e); logger.warn(() -> new ParameterizedMessage("failed to store task result, retrying in [{}]", wait), e);
threadPool.schedule(wait, ThreadPool.Names.SAME, () -> doStoreResult(backoff, index, listener)); threadPool.schedule(() -> doStoreResult(backoff, index, listener), wait, ThreadPool.Names.SAME);
} }
} }
}); });

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import java.util.concurrent.Future;
class CancellableAdapter implements Scheduler.Cancellable {
private Future<?> future;
CancellableAdapter(Future<?> future) {
assert future != null;
this.future = future;
}
@Override
public boolean cancel() {
return FutureUtils.cancel(future);
}
@Override
public boolean isCancelled() {
return future.isCancelled();
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
class ScheduledCancellableAdapter implements Scheduler.ScheduledCancellable {
private final ScheduledFuture<?> scheduledFuture;
ScheduledCancellableAdapter(ScheduledFuture<?> scheduledFuture) {
assert scheduledFuture != null;
this.scheduledFuture = scheduledFuture;
}
@Override
public long getDelay(TimeUnit unit) {
return scheduledFuture.getDelay(unit);
}
@Override
public int compareTo(Delayed other) {
// unwrap other by calling on it.
return -other.compareTo(scheduledFuture);
}
@Override
public boolean cancel() {
return FutureUtils.cancel(scheduledFuture);
}
@Override
public boolean isCancelled() {
return scheduledFuture.isCancelled();
}
}

View File

@ -19,6 +19,9 @@
package org.elasticsearch.threadpool; package org.elasticsearch.threadpool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -27,6 +30,8 @@ import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import java.util.concurrent.Delayed;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -39,6 +44,14 @@ import java.util.function.Consumer;
*/ */
public interface Scheduler { public interface Scheduler {
/**
* Create a scheduler that can be used client side. Server side, please use <code>ThreadPool.schedule</code> instead.
*
* Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started
* using execute, submit and schedule.
* @param settings the settings to use
* @return executor
*/
static ScheduledThreadPoolExecutor initScheduler(Settings settings) { static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1, final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1,
EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
@ -85,16 +98,16 @@ public interface Scheduler {
* The command runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow * The command runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow
* to execute on a different executor, in which case blocking calls are allowed. * to execute on a different executor, in which case blocking calls are allowed.
* *
* @param command the command to run
* @param delay delay before the task executes * @param delay delay before the task executes
* @param executor the name of the executor that has to execute this task. Ignored in the default implementation but can be used * @param executor the name of the executor that has to execute this task. Ignored in the default implementation but can be used
* by subclasses that support multiple executors. * by subclasses that support multiple executors.
* @param command the command to run
* @return a ScheduledFuture who's get will return when the task has been added to its target thread pool and throws an exception if * @return a ScheduledFuture who's get will return when the task has been added to its target thread pool and throws an exception if
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
* the ScheduledFuture cannot interact with it. * the ScheduledFuture cannot interact with it.
* @throws EsRejectedExecutionException if the task cannot be scheduled for execution * @throws EsRejectedExecutionException if the task cannot be scheduled for execution
*/ */
ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command); ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor);
/** /**
* Schedules a periodic action that runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow * Schedules a periodic action that runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow
@ -111,6 +124,25 @@ public interface Scheduler {
return new ReschedulingRunnable(command, interval, executor, this, (e) -> {}, (e) -> {}); return new ReschedulingRunnable(command, interval, executor, this, (e) -> {}, (e) -> {});
} }
/**
* Utility method to wrap a <code>Future</code> as a <code>Cancellable</code>
* @param future the future to wrap
* @return a cancellable delegating to the future
*/
static Cancellable wrapAsCancellable(Future<?> future) {
return new CancellableAdapter(future);
}
/**
* Utility method to wrap a <code>ScheduledFuture</code> as a <code>ScheduledCancellable</code>
* @param scheduledFuture the scheduled future to wrap
* @return a SchedulecCancellable delegating to the scheduledFuture
*/
static ScheduledCancellable wrapAsScheduledCancellable(ScheduledFuture<?> scheduledFuture) {
return new ScheduledCancellableAdapter(scheduledFuture);
}
/** /**
* This interface represents an object whose execution may be cancelled during runtime. * This interface represents an object whose execution may be cancelled during runtime.
*/ */
@ -119,7 +151,7 @@ public interface Scheduler {
/** /**
* Cancel the execution of this object. This method is idempotent. * Cancel the execution of this object. This method is idempotent.
*/ */
void cancel(); boolean cancel();
/** /**
* Check if the execution has been cancelled * Check if the execution has been cancelled
@ -128,6 +160,11 @@ public interface Scheduler {
boolean isCancelled(); boolean isCancelled();
} }
/**
* A scheduled cancellable allow cancelling and reading the remaining delay of a scheduled task.
*/
interface ScheduledCancellable extends Delayed, Cancellable { }
/** /**
* This class encapsulates the scheduling of a {@link Runnable} that needs to be repeated on a interval. For example, checking a value * This class encapsulates the scheduling of a {@link Runnable} that needs to be repeated on a interval. For example, checking a value
* for cleanup every second could be done by passing in a Runnable that can perform the check and the specified interval between * for cleanup every second could be done by passing in a Runnable that can perform the check and the specified interval between
@ -165,12 +202,14 @@ public interface Scheduler {
this.scheduler = scheduler; this.scheduler = scheduler;
this.rejectionConsumer = rejectionConsumer; this.rejectionConsumer = rejectionConsumer;
this.failureConsumer = failureConsumer; this.failureConsumer = failureConsumer;
scheduler.schedule(interval, executor, this); scheduler.schedule(this, interval, executor);
} }
@Override @Override
public void cancel() { public boolean cancel() {
final boolean result = run;
run = false; run = false;
return result;
} }
@Override @Override
@ -202,7 +241,7 @@ public interface Scheduler {
// if this has not been cancelled reschedule it to run again // if this has not been cancelled reschedule it to run again
if (run) { if (run) {
try { try {
scheduler.schedule(interval, executor, this); scheduler.schedule(this, interval, executor);
} catch (final EsRejectedExecutionException e) { } catch (final EsRejectedExecutionException e) {
onRejection(e); onRejection(e);
} }
@ -211,9 +250,10 @@ public interface Scheduler {
} }
/** /**
* This subclass ensures to properly bubble up Throwable instances of type Error. * This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks
*/ */
class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class);
@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
public SafeScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { public SafeScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
@ -232,7 +272,12 @@ public interface Scheduler {
@Override @Override
protected void afterExecute(Runnable r, Throwable t) { protected void afterExecute(Runnable r, Throwable t) {
EsExecutors.rethrowErrors(r); Throwable exception = EsExecutors.rethrowErrors(r);
if (exception != null) {
logger.warn(() ->
new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()),
exception);
}
} }
} }
} }

View File

@ -55,7 +55,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -336,27 +335,27 @@ public class ThreadPool implements Scheduler, Closeable {
* context of the calling thread you may call <code>threadPool.getThreadContext().preserveContext</code> on the runnable before passing * context of the calling thread you may call <code>threadPool.getThreadContext().preserveContext</code> on the runnable before passing
* it to this method. * it to this method.
* *
* @param command the command to run
* @param delay delay before the task executes * @param delay delay before the task executes
* @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes * @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes
* the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the * the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the
* command completes. * command completes.
* @param command the command to run
* @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if * @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
* the ScheduledFuture will cannot interact with it. * the ScheduledFuture will cannot interact with it.
* @throws org.elasticsearch.common.util.concurrent.EsRejectedExecutionException if the task cannot be scheduled for execution * @throws org.elasticsearch.common.util.concurrent.EsRejectedExecutionException if the task cannot be scheduled for execution
*/ */
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) { public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
if (!Names.SAME.equals(executor)) { if (!Names.SAME.equals(executor)) {
command = new ThreadedRunnable(command, executor(executor)); command = new ThreadedRunnable(command, executor(executor));
} }
return scheduler.schedule(new ThreadPool.LoggingRunnable(command), delay.millis(), TimeUnit.MILLISECONDS); return new ScheduledCancellableAdapter(scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
} }
public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) { public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) {
try { try {
schedule(delay, executor, command); schedule(command, delay, executor);
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) { if (e.isExecutorShutdown()) {
logger.debug(new ParameterizedMessage("could not schedule execution of [{}] after [{}] on [{}] as executor is shut down", logger.debug(new ParameterizedMessage("could not schedule execution of [{}] after [{}] on [{}] as executor is shut down",

View File

@ -326,7 +326,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
} }
TimeValue connectTimeout = connectionProfile.getConnectTimeout(); TimeValue connectTimeout = connectionProfile.getConnectTimeout();
threadPool.schedule(connectTimeout, ThreadPool.Names.GENERIC, channelsConnectedListener::onTimeout); threadPool.schedule(channelsConnectedListener::onTimeout, connectTimeout, ThreadPool.Names.GENERIC);
return channels; return channels;
} }

View File

@ -73,8 +73,10 @@ final class TransportHandshaker {
final Version minCompatVersion = version.minimumCompatibilityVersion(); final Version minCompatVersion = version.minimumCompatibilityVersion();
handshakeRequestSender.sendRequest(node, channel, requestId, minCompatVersion); handshakeRequestSender.sendRequest(node, channel, requestId, minCompatVersion);
threadPool.schedule(timeout, ThreadPool.Names.GENERIC, threadPool.schedule(
() -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]"))); () -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")),
timeout,
ThreadPool.Names.GENERIC);
success = true; success = true;
} catch (Exception e) { } catch (Exception e) {
handler.handleLocalException(new ConnectTransportException(node, "failure to send " + HANDSHAKE_ACTION_NAME, e)); handler.handleLocalException(new ConnectTransportException(node, "failure to send " + HANDSHAKE_ACTION_NAME, e));

View File

@ -157,7 +157,7 @@ final class TransportKeepAlive implements Closeable {
void ensureStarted() { void ensureStarted() {
if (isStarted.get() == false && isStarted.compareAndSet(false, true)) { if (isStarted.get() == false && isStarted.compareAndSet(false, true)) {
threadPool.schedule(pingInterval, ThreadPool.Names.GENERIC, this); threadPool.schedule(this, pingInterval, ThreadPool.Names.GENERIC);
} }
} }

View File

@ -42,14 +42,15 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
@ -65,7 +66,6 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -622,7 +622,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
} }
if (timeoutHandler != null) { if (timeoutHandler != null) {
assert options.timeout() != null; assert options.timeout() != null;
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler); timeoutHandler.scheduleTimeout(options.timeout());
} }
connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
} catch (final Exception e) { } catch (final Exception e) {
@ -989,7 +989,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
private final long sentTime = threadPool.relativeTimeInMillis(); private final long sentTime = threadPool.relativeTimeInMillis();
private final String action; private final String action;
private final DiscoveryNode node; private final DiscoveryNode node;
volatile ScheduledFuture future; volatile Scheduler.Cancellable cancellable;
TimeoutHandler(long requestId, DiscoveryNode node, String action) { TimeoutHandler(long requestId, DiscoveryNode node, String action) {
this.requestId = requestId; this.requestId = requestId;
@ -1024,13 +1024,19 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
public void cancel() { public void cancel() {
assert responseHandlers.contains(requestId) == false : assert responseHandlers.contains(requestId) == false :
"cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers"; "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
FutureUtils.cancel(future); if (cancellable != null) {
cancellable.cancel();
}
} }
@Override @Override
public String toString() { public String toString() {
return "timeout handler for [" + requestId + "][" + action + "]"; return "timeout handler for [" + requestId + "][" + action + "]";
} }
private void scheduleTimeout(TimeValue timeout) {
this.cancellable = threadPool.schedule(this, timeout, ThreadPool.Names.GENERIC);
}
} }
static class TimeoutInfoHolder { static class TimeoutInfoHolder {

View File

@ -96,7 +96,7 @@ public class BulkProcessorTests extends ESTestCase {
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {}; BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {};
BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(), BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(),
0, 10, new ByteSizeValue(1000), null, 0, 10, new ByteSizeValue(1000), null,
(delay, executor, command) -> null, () -> called.set(true), BulkRequest::new); (command, delay, executor) -> null, () -> called.set(true), BulkRequest::new);
assertFalse(called.get()); assertFalse(called.get());
bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS); bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS);

View File

@ -33,10 +33,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
@ -149,9 +146,9 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
int batchSizeForMaxDelay = (int) (maxDelay.seconds() * originalRequestsPerSecond); int batchSizeForMaxDelay = (int) (maxDelay.seconds() * originalRequestsPerSecond);
ThreadPool threadPool = new TestThreadPool(getTestName()) { ThreadPool threadPool = new TestThreadPool(getTestName()) {
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) { public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
assertThat(delay.nanos(), both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(maxDelay.nanos()))); assertThat(delay.nanos(), both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(maxDelay.nanos())));
return super.schedule(delay, name, command); return super.schedule(command, delay, name);
} }
}; };
try { try {
@ -202,8 +199,8 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
public void testDelayNeverNegative() throws IOException { public void testDelayNeverNegative() throws IOException {
// Thread pool that returns a ScheduledFuture that claims to have a negative delay // Thread pool that returns a ScheduledFuture that claims to have a negative delay
ThreadPool threadPool = new TestThreadPool("test") { ThreadPool threadPool = new TestThreadPool("test") {
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) { public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
return new ScheduledFuture<Void>() { return new ScheduledCancellable() {
@Override @Override
public long getDelay(TimeUnit unit) { public long getDelay(TimeUnit unit) {
return -1; return -1;
@ -215,7 +212,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
} }
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -223,21 +220,6 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
public boolean isCancelled() { public boolean isCancelled() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean isDone() {
throw new UnsupportedOperationException();
}
@Override
public Void get() throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}
}; };
} }
}; };

View File

@ -175,7 +175,8 @@ public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
private static class MockCancellable implements Cancellable { private static class MockCancellable implements Cancellable {
@Override @Override
public void cancel() { public boolean cancel() {
return true;
} }
@Override @Override

View File

@ -33,7 +33,6 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -83,7 +82,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool, ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool,
(e) -> {}, (e) -> {}); (e) -> {}, (e) -> {});
// this call was made during construction of the runnable // this call was made during construction of the runnable
verify(threadPool, times(1)).schedule(delay, Names.GENERIC, reschedulingRunnable); verify(threadPool, times(1)).schedule(reschedulingRunnable, delay, Names.GENERIC);
// create a thread and start the runnable // create a thread and start the runnable
Thread runThread = new Thread() { Thread runThread = new Thread() {
@ -103,7 +102,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
runThread.join(); runThread.join();
// validate schedule was called again // validate schedule was called again
verify(threadPool, times(2)).schedule(delay, Names.GENERIC, reschedulingRunnable); verify(threadPool, times(2)).schedule(reschedulingRunnable, delay, Names.GENERIC);
} }
public void testThatRunnableIsRescheduled() throws Exception { public void testThatRunnableIsRescheduled() throws Exception {
@ -251,7 +250,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
terminate(threadPool); terminate(threadPool);
threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "fixed delay tests").build()) { threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "fixed delay tests").build()) {
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) { public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
if (command instanceof ReschedulingRunnable) { if (command instanceof ReschedulingRunnable) {
((ReschedulingRunnable) command).onRejection(new EsRejectedExecutionException()); ((ReschedulingRunnable) command).onRejection(new EsRejectedExecutionException());
} else { } else {

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class SchedulerTests extends ESTestCase {
public void testCancelOnThreadPool() {
ThreadPool threadPool = new TestThreadPool("test");
AtomicLong executed = new AtomicLong();
try {
ThreadPool.THREAD_POOL_TYPES.keySet().forEach(type ->
scheduleAndCancel(threadPool, executed, type));
assertEquals(0, executed.get());
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
private void scheduleAndCancel(ThreadPool threadPool, AtomicLong executed, String type) {
Scheduler.ScheduledCancellable scheduled = threadPool.schedule(executed::incrementAndGet, TimeValue.timeValueSeconds(20), type);
assertEquals(1, schedulerQueueSize(threadPool));
assertFalse(scheduled.isCancelled());
assertTrue(scheduled.cancel());
assertTrue(scheduled.isCancelled());
assertEquals("Cancel must auto-remove", 0, schedulerQueueSize(threadPool));
}
private int schedulerQueueSize(ThreadPool threadPool) {
return ((Scheduler.SafeScheduledThreadPoolExecutor) threadPool.scheduler()).getQueue().size();
}
public void testCancelOnScheduler() {
ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY);
Scheduler scheduler = (command, delay, name) ->
Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
AtomicLong executed = new AtomicLong();
try {
Scheduler.ScheduledCancellable scheduled =
scheduler.schedule(executed::incrementAndGet, TimeValue.timeValueSeconds(20), ThreadPool.Names.SAME);
assertEquals(1, executor.getQueue().size());
assertFalse(scheduled.isCancelled());
assertTrue(scheduled.cancel());
assertTrue(scheduled.isCancelled());
assertEquals("Cancel must auto-remove", 0, executor.getQueue().size());
assertEquals(0, executed.get());
} finally {
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
}
}
public void testDelay() throws InterruptedException {
ThreadPool threadPool = new TestThreadPool("test");
try {
List<Scheduler.ScheduledCancellable> jobs = LongStream.range(20,30)
.mapToObj(delay -> threadPool.schedule(() -> {},
TimeValue.timeValueSeconds(delay),
ThreadPool.Names.SAME))
.collect(Collectors.toCollection(ArrayList::new));
Collections.reverse(jobs);
List<Long> initialDelays = verifyJobDelays(jobs);
Thread.sleep(50);
List<Long> laterDelays = verifyJobDelays(jobs);
assertThat(laterDelays,
Matchers.contains(initialDelays.stream().map(Matchers::lessThan).collect(Collectors.toList())));
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
private List<Long> verifyJobDelays(List<Scheduler.ScheduledCancellable> jobs) {
List<Long> delays = new ArrayList<>(jobs.size());
Scheduler.ScheduledCancellable previous = null;
for (Scheduler.ScheduledCancellable job : jobs) {
if (previous != null) {
long previousDelay = previous.getDelay(TimeUnit.MILLISECONDS);
long delay = job.getDelay(TimeUnit.MILLISECONDS);
assertThat(delay, Matchers.lessThan(previousDelay));
assertThat(job, Matchers.lessThan(previous));
}
assertThat(job.getDelay(TimeUnit.SECONDS), Matchers.greaterThan(1L));
assertThat(job.getDelay(TimeUnit.SECONDS), Matchers.lessThanOrEqualTo(30L));
delays.add(job.getDelay(TimeUnit.NANOSECONDS));
previous = job;
}
return delays;
}
// simple test for successful scheduling, exceptions tested more thoroughly in EvilThreadPoolTests
public void testScheduledOnThreadPool() throws InterruptedException {
ThreadPool threadPool = new TestThreadPool("test");
CountDownLatch missingExecutions = new CountDownLatch(ThreadPool.THREAD_POOL_TYPES.keySet().size());
try {
ThreadPool.THREAD_POOL_TYPES.keySet()
.forEach(type ->
threadPool.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), type));
assertTrue(missingExecutions.await(30, TimeUnit.SECONDS));
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
// simple test for successful scheduling, exceptions tested more thoroughly in EvilThreadPoolTests
public void testScheduledOnScheduler() throws InterruptedException {
ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY);
Scheduler scheduler = (command, delay, name) ->
Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
CountDownLatch missingExecutions = new CountDownLatch(1);
try {
scheduler.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), ThreadPool.Names.SAME);
assertTrue(missingExecutions.await(30, TimeUnit.SECONDS));
} finally {
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
}
}
}

View File

@ -32,7 +32,6 @@ import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Deque; import java.util.Deque;
import java.util.concurrent.ScheduledFuture;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -212,7 +211,7 @@ public class TransportKeepAliveTests extends ESTestCase {
} }
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable task) { public ScheduledCancellable schedule(Runnable task, TimeValue delay, String executor) {
scheduledTasks.add(new Tuple<>(delay, task)); scheduledTasks.add(new Tuple<>(delay, task));
return null; return null;
} }

View File

@ -344,7 +344,7 @@ public class DeterministicTaskQueue {
} }
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable command) { public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
final int NOT_STARTED = 0; final int NOT_STARTED = 0;
final int STARTED = 1; final int STARTED = 1;
final int CANCELLED = 2; final int CANCELLED = 2;
@ -364,7 +364,7 @@ public class DeterministicTaskQueue {
} }
})); }));
return new ScheduledFuture<Object>() { return new ScheduledCancellable() {
@Override @Override
public long getDelay(TimeUnit unit) { public long getDelay(TimeUnit unit) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
@ -376,8 +376,7 @@ public class DeterministicTaskQueue {
} }
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel() {
assert mayInterruptIfRunning == false;
return taskState.compareAndSet(NOT_STARTED, CANCELLED); return taskState.compareAndSet(NOT_STARTED, CANCELLED);
} }
@ -386,20 +385,6 @@ public class DeterministicTaskQueue {
return taskState.get() == CANCELLED; return taskState.get() == CANCELLED;
} }
@Override
public boolean isDone() {
throw new UnsupportedOperationException();
}
@Override
public Object get() {
throw new UnsupportedOperationException();
}
@Override
public Object get(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
}; };
} }

View File

@ -368,7 +368,7 @@ public final class MockTransportService extends TransportService {
runnable.run(); runnable.run();
} else { } else {
requestsToSendWhenCleared.add(runnable); requestsToSendWhenCleared.add(runnable);
threadPool.schedule(delay, ThreadPool.Names.GENERIC, runnable); threadPool.schedule(runnable, delay, ThreadPool.Names.GENERIC);
} }
} }
} }

View File

@ -29,7 +29,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -322,14 +321,14 @@ public class DeterministicTaskQueueTests extends ESTestCase {
final ThreadPool threadPool = taskQueue.getThreadPool(); final ThreadPool threadPool = taskQueue.getThreadPool();
final long delayMillis = randomLongBetween(1, 100); final long delayMillis = randomLongBetween(1, 100);
threadPool.schedule(TimeValue.timeValueMillis(delayMillis), GENERIC, () -> strings.add("deferred")); threadPool.schedule(() -> strings.add("deferred"), TimeValue.timeValueMillis(delayMillis), GENERIC);
assertFalse(taskQueue.hasRunnableTasks()); assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks()); assertTrue(taskQueue.hasDeferredTasks());
threadPool.schedule(TimeValue.ZERO, GENERIC, () -> strings.add("runnable")); threadPool.schedule(() -> strings.add("runnable"), TimeValue.ZERO, GENERIC);
assertTrue(taskQueue.hasRunnableTasks()); assertTrue(taskQueue.hasRunnableTasks());
threadPool.schedule(TimeValue.MINUS_ONE, GENERIC, () -> strings.add("also runnable")); threadPool.schedule(() -> strings.add("also runnable"), TimeValue.MINUS_ONE, GENERIC);
taskQueue.runAllTasks(); taskQueue.runAllTasks();
@ -339,8 +338,8 @@ public class DeterministicTaskQueueTests extends ESTestCase {
final long delayMillis1 = randomLongBetween(2, 100); final long delayMillis1 = randomLongBetween(2, 100);
final long delayMillis2 = randomLongBetween(1, delayMillis1 - 1); final long delayMillis2 = randomLongBetween(1, delayMillis1 - 1);
threadPool.schedule(TimeValue.timeValueMillis(delayMillis1), GENERIC, () -> strings.add("further deferred")); threadPool.schedule(() -> strings.add("further deferred"), TimeValue.timeValueMillis(delayMillis1), GENERIC);
threadPool.schedule(TimeValue.timeValueMillis(delayMillis2), GENERIC, () -> strings.add("not quite so deferred")); threadPool.schedule(() -> strings.add("not quite so deferred"), TimeValue.timeValueMillis(delayMillis2), GENERIC);
assertFalse(taskQueue.hasRunnableTasks()); assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks()); assertTrue(taskQueue.hasDeferredTasks());
@ -349,9 +348,10 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1)); assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1));
final TimeValue cancelledDelay = TimeValue.timeValueMillis(randomLongBetween(1, 100)); final TimeValue cancelledDelay = TimeValue.timeValueMillis(randomLongBetween(1, 100));
final ScheduledFuture<?> future = threadPool.schedule(cancelledDelay, "", () -> strings.add("cancelled before execution")); final Scheduler.Cancellable cancelledBeforeExecution =
threadPool.schedule(() -> strings.add("cancelled before execution"), cancelledDelay, "");
future.cancel(false); cancelledBeforeExecution.cancel();
taskQueue.runAllTasks(); taskQueue.runAllTasks();
assertThat(strings, containsInAnyOrder("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred")); assertThat(strings, containsInAnyOrder("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred"));

View File

@ -300,7 +300,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) { if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) {
logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number",
shardFollowNodeTask), e); shardFollowNodeTask), e);
threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME);
} else { } else {
shardFollowNodeTask.markAsFailed(e); shardFollowNodeTask.markAsFailed(e);
} }

View File

@ -98,7 +98,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> { BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> {
assert delay.millis() < 100 : "The delay should be kept to a minimum, so that this test does not take to long to run"; assert delay.millis() < 100 : "The delay should be kept to a minimum, so that this test does not take to long to run";
if (stopped.get() == false) { if (stopped.get() == false) {
threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); threadPool.schedule(task, delay, ThreadPool.Names.GENERIC);
} }
}; };
List<Translog.Operation> receivedOperations = Collections.synchronizedList(new ArrayList<>()); List<Translog.Operation> receivedOperations = Collections.synchronizedList(new ArrayList<>());

View File

@ -380,7 +380,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
); );
final String recordedLeaderIndexHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); final String recordedLeaderIndexHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(task, delay, ThreadPool.Names.GENERIC);
AtomicBoolean stopped = new AtomicBoolean(false); AtomicBoolean stopped = new AtomicBoolean(false);
LongSet fetchOperations = new LongHashSet(); LongSet fetchOperations = new LongHashSet();
return new ShardFollowNodeTask( return new ShardFollowNodeTask(

View File

@ -13,7 +13,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
@ -21,7 +21,6 @@ import java.time.Clock;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Objects; import java.util.Objects;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -45,7 +44,7 @@ public class MlDailyMaintenanceService implements Releasable {
*/ */
private final Supplier<TimeValue> schedulerProvider; private final Supplier<TimeValue> schedulerProvider;
private volatile ScheduledFuture<?> future; private volatile Scheduler.Cancellable cancellable;
MlDailyMaintenanceService(ThreadPool threadPool, Client client, Supplier<TimeValue> scheduleProvider) { MlDailyMaintenanceService(ThreadPool threadPool, Client client, Supplier<TimeValue> scheduleProvider) {
this.threadPool = Objects.requireNonNull(threadPool); this.threadPool = Objects.requireNonNull(threadPool);
@ -87,13 +86,13 @@ public class MlDailyMaintenanceService implements Releasable {
public void stop() { public void stop() {
LOGGER.debug("Stopping ML daily maintenance service"); LOGGER.debug("Stopping ML daily maintenance service");
if (future != null && future.isCancelled() == false) { if (cancellable != null && cancellable.isCancelled() == false) {
FutureUtils.cancel(future); cancellable.cancel();
} }
} }
public boolean isStarted() { public boolean isStarted() {
return future != null; return cancellable != null;
} }
@Override @Override
@ -103,7 +102,7 @@ public class MlDailyMaintenanceService implements Releasable {
private void scheduleNext() { private void scheduleNext() {
try { try {
future = threadPool.schedule(schedulerProvider.get(), ThreadPool.Names.GENERIC, this::triggerTasks); cancellable = threadPool.schedule(this::triggerTasks, schedulerProvider.get(), ThreadPool.Names.GENERIC);
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) { if (e.isExecutorShutdown()) {
LOGGER.debug("failed to schedule next maintenance task; shutting down", e); LOGGER.debug("failed to schedule next maintenance task; shutting down", e);

View File

@ -16,11 +16,11 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
@ -41,7 +41,6 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -153,7 +152,8 @@ public class DatafeedManager {
// otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel // otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel
// the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running. // the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running.
private void innerRun(Holder holder, long startTime, Long endTime) { private void innerRun(Holder holder, long startTime, Long endTime) {
holder.future = threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() { holder.cancellable =
Scheduler.wrapAsCancellable(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() {
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
@ -204,14 +204,14 @@ public class DatafeedManager {
} }
} }
} }
}); }));
} }
void doDatafeedRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) { void doDatafeedRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) {
if (holder.isRunning() && !holder.isIsolated()) { if (holder.isRunning() && !holder.isIsolated()) {
TimeValue delay = computeNextDelay(delayInMsSinceEpoch); TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId); logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId);
holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_THREAD_POOL_NAME, new AbstractRunnable() { holder.cancellable = threadPool.schedule(new AbstractRunnable() {
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
@ -248,7 +248,7 @@ public class DatafeedManager {
doDatafeedRealtime(nextDelayInMsSinceEpoch, jobId, holder); doDatafeedRealtime(nextDelayInMsSinceEpoch, jobId, holder);
} }
} }
}); }, delay, MachineLearning.DATAFEED_THREAD_POOL_NAME);
} }
} }
@ -297,7 +297,7 @@ public class DatafeedManager {
private final boolean autoCloseJob; private final boolean autoCloseJob;
private final ProblemTracker problemTracker; private final ProblemTracker problemTracker;
private final Consumer<Exception> finishHandler; private final Consumer<Exception> finishHandler;
volatile Future<?> future; volatile Scheduler.Cancellable cancellable;
private volatile boolean isRelocating; private volatile boolean isRelocating;
Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob,
@ -341,7 +341,9 @@ public class DatafeedManager {
logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeedId, logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeedId,
datafeedJob.getJobId(), acquired); datafeedJob.getJobId(), acquired);
runningDatafeedsOnThisNode.remove(allocationId); runningDatafeedsOnThisNode.remove(allocationId);
FutureUtils.cancel(future); if (cancellable != null) {
cancellable.cancel();
}
auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
finishHandler.accept(e); finishHandler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),

View File

@ -10,11 +10,11 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before; import org.junit.Before;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -46,8 +46,8 @@ public class MlInitializationServiceTests extends ESTestCase {
}).when(executorService).execute(any(Runnable.class)); }).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
ScheduledFuture scheduledFuture = mock(ScheduledFuture.class); Scheduler.ScheduledCancellable scheduledCancellable = mock(Scheduler.ScheduledCancellable.class);
when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture); when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable);
when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME); when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME);
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
@ -48,7 +49,7 @@ import java.net.InetAddress;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -114,7 +115,7 @@ public class DatafeedManagerTests extends ESTestCase {
ExecutorService executorService = mock(ExecutorService.class); ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> { doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run(); ((Runnable) invocation.getArguments()[0]).run();
return null; return mock(Future.class);
}).when(executorService).submit(any(Runnable.class)); }).when(executorService).submit(any(Runnable.class));
when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService); when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService);
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
@ -178,11 +179,11 @@ public class DatafeedManagerTests extends ESTestCase {
int[] counter = new int[] {0}; int[] counter = new int[] {0};
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
if (counter[0]++ < 10) { if (counter[0]++ < 10) {
Runnable r = (Runnable) invocationOnMock.getArguments()[2]; Runnable r = (Runnable) invocationOnMock.getArguments()[0];
currentTime += 600000; currentTime += 600000;
r.run(); r.run();
} }
return mock(ScheduledFuture.class); return mock(Scheduler.ScheduledCancellable.class);
}).when(threadPool).schedule(any(), any(), any()); }).when(threadPool).schedule(any(), any(), any());
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L)); when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L));
@ -192,7 +193,7 @@ public class DatafeedManagerTests extends ESTestCase {
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null);
datafeedManager.run(task, handler); datafeedManager.run(task, handler);
verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); verify(threadPool, times(11)).schedule(any(), any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME));
verify(auditor, times(1)).warning(eq("job_id"), anyString()); verify(auditor, times(1)).warning(eq("job_id"), anyString());
} }
@ -248,7 +249,7 @@ public class DatafeedManagerTests extends ESTestCase {
verify(handler).accept(null); verify(handler).accept(null);
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false)); assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false));
} else { } else {
verify(threadPool, times(1)).schedule(eq(new TimeValue(1)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); verify(threadPool, times(1)).schedule(any(), eq(new TimeValue(1)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME));
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true)); assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true));
} }
} }

View File

@ -457,7 +457,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
} }
private void setupScheduleDelayTime(TimeValue delay) { private void setupScheduleDelayTime(TimeValue delay) {
when(threadPool.schedule(any(TimeValue.class), anyString(), any(Runnable.class))) when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
.thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[2], delay.nanos(), TimeUnit.NANOSECONDS)); .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS));
} }
} }

View File

@ -13,8 +13,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable; import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.monitoring.MonitoringField; import org.elasticsearch.xpack.core.monitoring.MonitoringField;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -22,7 +22,6 @@ import org.joda.time.chrono.ISOChronology;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
/** /**
* {@code CleanerService} takes care of deleting old monitoring indices. * {@code CleanerService} takes care of deleting old monitoring indices.
@ -57,7 +56,7 @@ public class CleanerService extends AbstractLifecycleComponent {
@Override @Override
protected void doStart() { protected void doStart() {
logger.debug("starting cleaning service"); logger.debug("starting cleaning service");
threadPool.schedule(executionScheduler.nextExecutionDelay(new DateTime(ISOChronology.getInstance())), executorName(), runnable); threadPool.schedule(runnable, executionScheduler.nextExecutionDelay(new DateTime(ISOChronology.getInstance())), executorName());
logger.debug("cleaning service started"); logger.debug("cleaning service started");
} }
@ -153,7 +152,7 @@ public class CleanerService extends AbstractLifecycleComponent {
*/ */
class IndicesCleaner extends AbstractLifecycleRunnable { class IndicesCleaner extends AbstractLifecycleRunnable {
private volatile ScheduledFuture<?> future; private volatile Scheduler.Cancellable cancellable;
/** /**
* Enable automatic logging and stopping of the runnable based on the {@link #lifecycle}. * Enable automatic logging and stopping of the runnable based on the {@link #lifecycle}.
@ -197,7 +196,7 @@ public class CleanerService extends AbstractLifecycleComponent {
logger.debug("scheduling next execution in [{}] seconds", delay.seconds()); logger.debug("scheduling next execution in [{}] seconds", delay.seconds());
try { try {
future = threadPool.schedule(delay, executorName(), this); cancellable = threadPool.schedule(this, delay, executorName());
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) { if (e.isExecutorShutdown()) {
logger.debug("couldn't schedule new execution of the cleaner, executor is shutting down", e); logger.debug("couldn't schedule new execution of the cleaner, executor is shutting down", e);
@ -215,13 +214,13 @@ public class CleanerService extends AbstractLifecycleComponent {
/** /**
* Cancel/stop the cleaning service. * Cancel/stop the cleaning service.
* <p> * <p>
* This will kill any scheduled {@link #future} from running. It's possible that this will be executed concurrently with the * This will kill any scheduled {@link #cancellable} from running. It's possible that this will be executed concurrently with the
* {@link #onAfter() rescheduling code}, at which point it will be stopped during the next execution <em>if</em> the service is * {@link #onAfter() rescheduling code}, at which point it will be stopped during the next execution <em>if</em> the service is
* stopped. * stopped.
*/ */
public void cancel() { public void cancel() {
if (future != null && future.isCancelled() == false) { if (cancellable != null && cancellable.isCancelled() == false) {
FutureUtils.cancel(future); cancellable.cancel();
} }
} }
} }

View File

@ -132,7 +132,7 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
contextPreservingListener(new LdapSessionActionListener("authenticate", token.principal(), listener))), logger contextPreservingListener(new LdapSessionActionListener("authenticate", token.principal(), listener))), logger
); );
threadPool.generic().execute(cancellableLdapRunnable); threadPool.generic().execute(cancellableLdapRunnable);
threadPool.schedule(executionTimeout, Names.SAME, cancellableLdapRunnable::maybeTimeout); threadPool.schedule(cancellableLdapRunnable::maybeTimeout, executionTimeout, Names.SAME);
} }
@Override @Override
@ -147,7 +147,7 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
() -> sessionFactory.unauthenticatedSession(username, () -> sessionFactory.unauthenticatedSession(username,
contextPreservingListener(new LdapSessionActionListener("lookup", username, sessionListener))), logger); contextPreservingListener(new LdapSessionActionListener("lookup", username, sessionListener))), logger);
threadPool.generic().execute(cancellableLdapRunnable); threadPool.generic().execute(cancellableLdapRunnable);
threadPool.schedule(executionTimeout, Names.SAME, cancellableLdapRunnable::maybeTimeout); threadPool.schedule(cancellableLdapRunnable::maybeTimeout, executionTimeout, Names.SAME);
} else { } else {
userActionListener.onResponse(null); userActionListener.onResponse(null);
} }