diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index 02ba33f19c4..99bde679f79 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -19,11 +19,6 @@ package org.elasticsearch.threadpool; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.LogEvent; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -31,7 +26,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.MockLogAppender; import org.junit.After; import org.junit.Before; @@ -44,7 +38,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; @@ -367,63 +360,28 @@ public class EvilThreadPoolTests extends ESTestCase { uncaughtExceptionHandlerLatch.countDown(); }); - final CountDownLatch supplierLatch = new CountDownLatch(1); - Runnable job = () -> { - try { - runnable.run(); - } finally { - supplierLatch.countDown(); - } - }; - - // snoop on logging to also handle the cases where exceptions are simply logged in Scheduler. - final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class); - final MockLogAppender appender = new MockLogAppender(); - appender.addExpectation( - new MockLogAppender.LoggingExpectation() { - @Override - public void match(LogEvent event) { - if (event.getLevel() == Level.WARN) { - assertThat("no other warnings than those expected", - event.getMessage().getFormattedMessage(), - equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]")); - assertTrue(expectThrowable); - assertNotNull(event.getThrown()); - assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown())); - uncaughtExceptionHandlerLatch.countDown(); - } - } - - @Override - public void assertMatched() { + try { + runner.accept(() -> { + try { + runnable.run(); + } finally { + supplierLatch.countDown(); } }); + } catch (Throwable t) { + consumer.accept(Optional.of(t)); + return; + } - appender.start(); - Loggers.addAppender(schedulerLogger, appender); - try { - try { - runner.accept(job); - } catch (Throwable t) { - consumer.accept(Optional.of(t)); - return; - } + supplierLatch.await(); - supplierLatch.await(); - - if (expectThrowable) { - uncaughtExceptionHandlerLatch.await(); - } - } finally { - Loggers.removeAppender(schedulerLogger, appender); - appender.stop(); + if (expectThrowable) { + uncaughtExceptionHandlerLatch.await(); } consumer.accept(Optional.ofNullable(throwableReference.get())); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); } finally { Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index eb9e36eeec0..d45a77ddb22 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -135,7 +135,7 @@ public class GlobalCheckpointListeners implements Closeable { * before we could be cancelled by the notification. In this case, our listener here would * not be in the map and we should not fire the timeout logic. */ - removed = listeners.remove(listener).v2() != null; + removed = listeners.remove(listener) != null; } if (removed) { final TimeoutException e = new TimeoutException(timeout.getStringRep()); diff --git a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java index 4c1ad6a3715..588495dd27d 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java +++ b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -21,7 +21,7 @@ package org.elasticsearch.threadpool; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -47,8 +47,8 @@ public interface Scheduler { /** * Create a scheduler that can be used client side. Server side, please use ThreadPool.schedule instead. * - * Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started - * using execute, submit and schedule. + * Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will + * be logged as a warning. This includes jobs started using execute, submit and schedule. * @param settings the settings to use * @return executor */ @@ -250,7 +250,8 @@ public interface Scheduler { } /** - * This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks + * This subclass ensures to properly bubble up Throwable instances of both type Error and Exception thrown in submitted/scheduled + * tasks to the uncaught exception handler */ class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class); @@ -272,12 +273,10 @@ public interface Scheduler { @Override protected void afterExecute(Runnable r, Throwable t) { - Throwable exception = EsExecutors.rethrowErrors(r); - if (exception != null) { - logger.warn(() -> - new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()), - exception); - } + if (t != null) return; + // Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we + // accept the wrapped exception in the output. + ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r)); } } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index d42abf6b4e9..5ca2b15d6ff 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -501,7 +501,16 @@ public class ThreadPool implements Scheduler, Closeable { @Override public void run() { - executor.execute(runnable); + try { + executor.execute(runnable); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down", + runnable, executor), e); + } else { + throw e; + } + } } @Override