Bubble-up exceptions from scheduler (#38317)

Instead of logging warnings we now rethrow exceptions thrown inside
scheduled/submitted tasks. This will still log them as warnings in
production but has the added benefit that if they are thrown during
unit/integration test runs, the test will be flagged as an error.

This is a continuation of #38014

Fixed NPE that caused CCR tests (IndexFollowingIT and likely others)
to fail.

schedule could bubble rejected exception to uncaught exception
handler when not using SAME executor if thread pool is terminated.
Now ignore rejected exception silently if executor is shutdown.
This commit is contained in:
Henning Andersen 2019-02-05 21:48:24 +01:00 committed by GitHub
parent a6ce671751
commit 20c66c5a05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 67 deletions

View File

@ -19,11 +19,6 @@
package org.elasticsearch.threadpool;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -31,7 +26,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.junit.After;
import org.junit.Before;
@ -44,7 +38,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
@ -367,63 +360,28 @@ public class EvilThreadPoolTests extends ESTestCase {
uncaughtExceptionHandlerLatch.countDown();
});
final CountDownLatch supplierLatch = new CountDownLatch(1);
Runnable job = () -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
};
// snoop on logging to also handle the cases where exceptions are simply logged in Scheduler.
final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class);
final MockLogAppender appender = new MockLogAppender();
appender.addExpectation(
new MockLogAppender.LoggingExpectation() {
@Override
public void match(LogEvent event) {
if (event.getLevel() == Level.WARN) {
assertThat("no other warnings than those expected",
event.getMessage().getFormattedMessage(),
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
assertTrue(expectThrowable);
assertNotNull(event.getThrown());
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
uncaughtExceptionHandlerLatch.countDown();
}
}
@Override
public void assertMatched() {
try {
runner.accept(() -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
});
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}
appender.start();
Loggers.addAppender(schedulerLogger, appender);
try {
try {
runner.accept(job);
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}
supplierLatch.await();
supplierLatch.await();
if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}
} finally {
Loggers.removeAppender(schedulerLogger, appender);
appender.stop();
if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}
consumer.accept(Optional.ofNullable(throwableReference.get()));
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} finally {
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
}

View File

@ -135,7 +135,7 @@ public class GlobalCheckpointListeners implements Closeable {
* before we could be cancelled by the notification. In this case, our listener here would
* not be in the map and we should not fire the timeout logic.
*/
removed = listeners.remove(listener).v2() != null;
removed = listeners.remove(listener) != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());

View File

@ -21,7 +21,7 @@ package org.elasticsearch.threadpool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -47,8 +47,8 @@ public interface Scheduler {
/**
* Create a scheduler that can be used client side. Server side, please use <code>ThreadPool.schedule</code> instead.
*
* Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started
* using execute, submit and schedule.
* Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will
* be logged as a warning. This includes jobs started using execute, submit and schedule.
* @param settings the settings to use
* @return executor
*/
@ -250,7 +250,8 @@ public interface Scheduler {
}
/**
* This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks
* This subclass ensures to properly bubble up Throwable instances of both type Error and Exception thrown in submitted/scheduled
* tasks to the uncaught exception handler
*/
class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class);
@ -272,12 +273,10 @@ public interface Scheduler {
@Override
protected void afterExecute(Runnable r, Throwable t) {
Throwable exception = EsExecutors.rethrowErrors(r);
if (exception != null) {
logger.warn(() ->
new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()),
exception);
}
if (t != null) return;
// Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we
// accept the wrapped exception in the output.
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
}
}
}

View File

@ -501,7 +501,16 @@ public class ThreadPool implements Scheduler, Closeable {
@Override
public void run() {
executor.execute(runnable);
try {
executor.execute(runnable);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down",
runnable, executor), e);
} else {
throw e;
}
}
}
@Override