Do not swallow exceptions in TimedRunnable (#39856)
Executors of type fixed_auto_queue_size (i.e. search / search_throttled) wrap runnables into TimedRunnable, which is an AbstractRunnable. This is dangerous as it might silently swallow exceptions, and possibly miss calling a response listener. While this has not triggered any failures in the tests I have run so far, it might help uncover future problems. Follow-up to #36137
This commit is contained in:
parent
292eb8b001
commit
4f941c6963
|
@ -163,20 +163,12 @@ public class EvilThreadPoolTests extends ESTestCase {
|
||||||
|
|
||||||
public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
|
public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
|
||||||
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
|
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
|
||||||
final boolean expectExceptionOnExecute =
|
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), true);
|
||||||
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
|
|
||||||
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
|
|
||||||
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
|
|
||||||
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), expectExceptionOnExecute);
|
|
||||||
|
|
||||||
// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
|
// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
|
||||||
checkExecutionException(getSubmitRunner(threadPool.executor(executor)), false);
|
checkExecutionException(getSubmitRunner(threadPool.executor(executor)), false);
|
||||||
|
|
||||||
final boolean expectExceptionOnSchedule =
|
checkExecutionException(getScheduleRunner(executor), true);
|
||||||
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
|
|
||||||
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
|
|
||||||
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
|
|
||||||
checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,8 +205,7 @@ public class EvilThreadPoolTests extends ESTestCase {
|
||||||
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
|
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
|
||||||
try {
|
try {
|
||||||
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
|
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
|
||||||
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
|
checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), true);
|
||||||
checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), false);
|
|
||||||
checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false);
|
checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false);
|
||||||
} finally {
|
} finally {
|
||||||
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
|
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
package org.elasticsearch.common.util.concurrent;
|
package org.elasticsearch.common.util.concurrent;
|
||||||
|
|
||||||
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation
|
* A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation
|
||||||
* through execution as well as only execution time.
|
* through execution as well as only execution time.
|
||||||
|
@ -48,6 +50,8 @@ class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
|
||||||
public void onRejection(final Exception e) {
|
public void onRejection(final Exception e) {
|
||||||
if (original instanceof AbstractRunnable) {
|
if (original instanceof AbstractRunnable) {
|
||||||
((AbstractRunnable) original).onRejection(e);
|
((AbstractRunnable) original).onRejection(e);
|
||||||
|
} else {
|
||||||
|
ExceptionsHelper.reThrowIfNotNull(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +66,8 @@ class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
|
||||||
public void onFailure(final Exception e) {
|
public void onFailure(final Exception e) {
|
||||||
if (original instanceof AbstractRunnable) {
|
if (original instanceof AbstractRunnable) {
|
||||||
((AbstractRunnable) original).onFailure(e);
|
((AbstractRunnable) original).onFailure(e);
|
||||||
|
} else {
|
||||||
|
ExceptionsHelper.reThrowIfNotNull(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -114,4 +114,34 @@ public final class TimedRunnableTests extends ESTestCase {
|
||||||
assertTrue(onAfter.get());
|
assertTrue(onAfter.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testTimedRunnableRethrowsExceptionWhenNotAbstractRunnable() {
|
||||||
|
final AtomicBoolean hasRun = new AtomicBoolean();
|
||||||
|
final RuntimeException exception = new RuntimeException();
|
||||||
|
|
||||||
|
final Runnable runnable = () -> {
|
||||||
|
hasRun.set(true);
|
||||||
|
throw exception;
|
||||||
|
};
|
||||||
|
|
||||||
|
final TimedRunnable timedRunnable = new TimedRunnable(runnable);
|
||||||
|
final RuntimeException thrown = expectThrows(RuntimeException.class, () -> timedRunnable.run());
|
||||||
|
assertTrue(hasRun.get());
|
||||||
|
assertSame(exception, thrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTimedRunnableRethrowsRejectionWhenNotAbstractRunnable() {
|
||||||
|
final AtomicBoolean hasRun = new AtomicBoolean();
|
||||||
|
final RuntimeException exception = new RuntimeException();
|
||||||
|
|
||||||
|
final Runnable runnable = () -> {
|
||||||
|
hasRun.set(true);
|
||||||
|
throw new AssertionError("should not run");
|
||||||
|
};
|
||||||
|
|
||||||
|
final TimedRunnable timedRunnable = new TimedRunnable(runnable);
|
||||||
|
final RuntimeException thrown = expectThrows(RuntimeException.class, () -> timedRunnable.onRejection(exception));
|
||||||
|
assertFalse(hasRun.get());
|
||||||
|
assertSame(exception, thrown);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue