Propagate Errors in executors to uncaught exception handler (#36137)

This is a continuation of #28667 and has as goal to convert all executors to propagate errors to the
uncaught exception handler. Notable missing ones were the direct executor and the scheduler. This
commit also makes it the property of the executor, not the runnable, to ensure this property. A big
part of this commit also consists of vastly improving the test coverage in this area.
This commit is contained in:
Yannick Welsch 2019-01-17 17:46:35 +01:00 committed by GitHub
parent a2d9c464b2
commit 6d64a2a901
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 517 additions and 117 deletions

View File

@ -50,3 +50,16 @@ java.nio.channels.SocketChannel#connect(java.net.SocketAddress)
java.lang.Boolean#getBoolean(java.lang.String)
org.apache.lucene.util.IOUtils @ use @org.elasticsearch.core.internal.io instead
@defaultMessage use executors from org.elasticsearch.common.util.concurrent.EsExecutors instead which will properly bubble up Errors
java.util.concurrent.AbstractExecutorService#<init>()
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue)
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory)
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.RejectedExecutionHandler)
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)
@defaultMessage extend org.elasticsearch.threadpool.Scheduler.SafeScheduledThreadPoolExecutor instead which will properly bubble up Errors
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.RejectedExecutionHandler)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)

View File

@ -19,12 +19,21 @@
package org.elasticsearch.threadpool;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@ -42,30 +51,279 @@ public class EvilThreadPoolTests extends ESTestCase {
}
@After
public void tearDownThreadPool() throws InterruptedException {
public void tearDownThreadPool() {
terminate(threadPool);
}
public void testExecutionException() throws InterruptedException {
runExecutionExceptionTest(
() -> {
public void testExecutionErrorOnDefaultThreadPoolTypes() throws InterruptedException {
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
checkExecutionError(getExecuteRunner(threadPool.executor(executor)));
checkExecutionError(getSubmitRunner(threadPool.executor(executor)));
checkExecutionError(getScheduleRunner(executor));
}
}
public void testExecutionErrorOnDirectExecutorService() throws InterruptedException {
final ExecutorService directExecutorService = EsExecutors.newDirectExecutorService();
checkExecutionError(getExecuteRunner(directExecutorService));
checkExecutionError(getSubmitRunner(directExecutorService));
}
public void testExecutionErrorOnFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1,
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionError(getExecuteRunner(fixedExecutor));
checkExecutionError(getSubmitRunner(fixedExecutor));
} finally {
ThreadPool.terminate(fixedExecutor, 10, TimeUnit.SECONDS);
}
}
public void testExecutionErrorOnScalingESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling("test", 1, 1,
10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionError(getExecuteRunner(scalingExecutor));
checkExecutionError(getSubmitRunner(scalingExecutor));
} finally {
ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS);
}
}
public void testExecutionErrorOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionError(getExecuteRunner(autoQueueFixedExecutor));
checkExecutionError(getSubmitRunner(autoQueueFixedExecutor));
} finally {
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
}
}
public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
try {
checkExecutionError(getExecuteRunner(prioritizedExecutor));
checkExecutionError(getSubmitRunner(prioritizedExecutor));
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r));
} finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
}
}
public void testExecutionErrorOnScheduler() throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
try {
checkExecutionError(getExecuteRunner(scheduler));
checkExecutionError(getSubmitRunner(scheduler));
checkExecutionError(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS));
} finally {
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
}
}
private void checkExecutionError(Consumer<Runnable> runner) throws InterruptedException {
logger.info("checking error for {}", runner);
final Runnable runnable;
if (randomBoolean()) {
runnable = () -> {
throw new Error("future error");
},
};
} else {
runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
}
@Override
protected void doRun() {
throw new Error("future error");
}
};
}
runExecutionTest(
runner,
runnable,
true,
o -> {
assertTrue(o.isPresent());
assertThat(o.get(), instanceOf(Error.class));
assertThat(o.get(), hasToString(containsString("future error")));
});
runExecutionExceptionTest(
() -> {
throw new IllegalStateException("future exception");
},
false,
o -> assertFalse(o.isPresent()));
}
private void runExecutionExceptionTest(
public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
final boolean expectExceptionOnExecute =
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), expectExceptionOnExecute);
// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
checkExecutionException(getSubmitRunner(threadPool.executor(executor)), false);
final boolean expectExceptionOnSchedule =
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE
// scheduler just swallows the exception here
// TODO: bubble these exceptions up
&& ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT;
checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule);
}
}
public void testExecutionExceptionOnDirectExecutorService() throws InterruptedException {
final ExecutorService directExecutorService = EsExecutors.newDirectExecutorService();
checkExecutionException(getExecuteRunner(directExecutorService), true);
checkExecutionException(getSubmitRunner(directExecutorService), false);
}
public void testExecutionExceptionOnFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1,
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionException(getExecuteRunner(fixedExecutor), true);
checkExecutionException(getSubmitRunner(fixedExecutor), false);
} finally {
ThreadPool.terminate(fixedExecutor, 10, TimeUnit.SECONDS);
}
}
public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling("test", 1, 1,
10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionException(getExecuteRunner(scalingExecutor), true);
checkExecutionException(getSubmitRunner(scalingExecutor), false);
} finally {
ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS);
}
}
public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), false);
checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false);
} finally {
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
}
}
public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
try {
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
checkExecutionException(getSubmitRunner(prioritizedExecutor), false);
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true);
} finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
}
}
public void testExecutionExceptionOnScheduler() throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
try {
// scheduler just swallows the exceptions
// TODO: bubble these exceptions up
checkExecutionException(getExecuteRunner(scheduler), false);
checkExecutionException(getSubmitRunner(scheduler), false);
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false);
} finally {
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
}
}
private void checkExecutionException(Consumer<Runnable> runner, boolean expectException) throws InterruptedException {
logger.info("checking exception for {}", runner);
final Runnable runnable;
final boolean willThrow;
if (randomBoolean()) {
runnable = () -> {
throw new IllegalStateException("future exception");
};
willThrow = expectException;
} else {
runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
}
@Override
protected void doRun() {
throw new IllegalStateException("future exception");
}
};
willThrow = false;
}
runExecutionTest(
runner,
runnable,
willThrow,
o -> {
assertEquals(willThrow, o.isPresent());
if (willThrow) {
assertThat(o.get(), instanceOf(IllegalStateException.class));
assertThat(o.get(), hasToString(containsString("future exception")));
}
});
}
Consumer<Runnable> getExecuteRunner(ExecutorService executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {
executor.execute(runnable);
}
@Override
public String toString() {
return "executor(" + executor + ").execute()";
}
};
}
Consumer<Runnable> getSubmitRunner(ExecutorService executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {
executor.submit(runnable);
}
@Override
public String toString() {
return "executor(" + executor + ").submit()";
}
};
}
Consumer<Runnable> getScheduleRunner(String executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {
threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable);
}
@Override
public String toString() {
return "schedule(" + executor + ")";
}
};
}
private void runExecutionTest(
final Consumer<Runnable> runner,
final Runnable runnable,
final boolean expectThrowable,
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
@ -82,13 +340,18 @@ public class EvilThreadPoolTests extends ESTestCase {
final CountDownLatch supplierLatch = new CountDownLatch(1);
threadPool.generic().submit(() -> {
try {
runner.accept(() -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
});
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}
supplierLatch.await();

View File

@ -243,6 +243,13 @@ public final class ExceptionsHelper {
return Optional.empty();
}
/**
* See {@link #maybeError(Throwable, Logger)}. Uses the class-local logger.
*/
public static Optional<Error> maybeError(final Throwable cause) {
return maybeError(cause, logger);
}
/**
* If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be
* caught and bubbles up to the uncaught exception handler. Note that the cause tree is examined for any {@link Error}. See

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -27,10 +29,14 @@ import org.elasticsearch.node.Node;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -108,7 +114,45 @@ public class EsExecutors {
new EsAbortPolicy(), contextHolder);
}
private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService() {
/**
* Checks if the runnable arose from asynchronous submission of a task to an executor. If an uncaught exception was thrown
* during the execution of this task, we need to inspect this runnable and see if it is an error that should be propagated
* to the uncaught exception handler.
*/
public static void rethrowErrors(Runnable runnable) {
if (runnable instanceof RunnableFuture) {
try {
((RunnableFuture) runnable).get();
} catch (final Exception e) {
/*
* In theory, Future#get can only throw a cancellation exception, an interrupted exception, or an execution
* exception. We want to ignore cancellation exceptions, restore the interrupt status on interrupted exceptions, and
* inspect the cause of an execution. We are going to be extra paranoid here though and completely unwrap the
* exception to ensure that there is not a buried error anywhere. We assume that a general exception has been
* handled by the executed task or the task submitter.
*/
assert e instanceof CancellationException
|| e instanceof InterruptedException
|| e instanceof ExecutionException : e;
final Optional<Error> maybeError = ExceptionsHelper.maybeError(e);
if (maybeError.isPresent()) {
// throw this error where it will propagate to the uncaught exception handler
throw maybeError.get();
}
if (e instanceof InterruptedException) {
// restore the interrupt status
Thread.currentThread().interrupt();
}
}
}
}
private static final class DirectExecutorService extends AbstractExecutorService {
@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
DirectExecutorService() {
super();
}
@Override
public void shutdown() {
@ -131,16 +175,18 @@ public class EsExecutors {
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
public boolean awaitTermination(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public void execute(Runnable command) {
command.run();
rethrowErrors(command);
}
}
};
private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new DirectExecutorService();
/**
* Returns an {@link ExecutorService} that executes submitted tasks on the current thread. This executor service does not support being

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.SuppressForbidden;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -48,6 +50,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
}
@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
ThreadContext contextHolder) {
@ -89,11 +92,8 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
}
@Override
public void execute(final Runnable command) {
doExecute(wrapRunnable(command));
}
protected void doExecute(final Runnable command) {
public void execute(Runnable command) {
command = wrapRunnable(command);
try {
super.execute(command);
} catch (EsRejectedExecutionException ex) {
@ -115,6 +115,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
EsExecutors.rethrowErrors(unwrap(r));
assert assertDefaultContext(r);
}

View File

@ -96,13 +96,13 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
/** innerRunnable can be null if task is finished but not removed from executor yet,
* see {@link TieBreakingPrioritizedRunnable#run} and {@link TieBreakingPrioritizedRunnable#runAndClean}
*/
pending.add(new Pending(unwrap(innerRunnable), t.priority(), t.insertionOrder, executing));
pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing));
}
} else if (runnable instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) runnable;
Object task = t.task;
if (t.task instanceof Runnable) {
task = unwrap((Runnable) t.task);
task = super.unwrap((Runnable) t.task);
}
pending.add(new Pending(task, t.priority, t.insertionOrder, executing));
}
@ -122,7 +122,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
command = wrapRunnable(command);
doExecute(command);
execute(command);
if (timeout.nanos() >= 0) {
if (command instanceof TieBreakingPrioritizedRunnable) {
((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);
@ -149,6 +149,14 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
}
@Override
protected Runnable unwrap(Runnable runnable) {
if (runnable instanceof WrappedRunnable) {
return super.unwrap(((WrappedRunnable) runnable).unwrap());
} else {
return super.unwrap(runnable);
}
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
@ -181,7 +189,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
}
private final class TieBreakingPrioritizedRunnable extends PrioritizedRunnable {
private final class TieBreakingPrioritizedRunnable extends PrioritizedRunnable implements WrappedRunnable {
private Runnable runnable;
private final long insertionOrder;
@ -246,11 +254,16 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
runnable = null;
timeoutFuture = null;
}
}
}
private final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
@Override
public Runnable unwrap() {
return runnable;
}
}
private static final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
final Object task;
final Priority priority;

View File

@ -29,7 +29,7 @@ public abstract class PrioritizedRunnable implements Runnable, Comparable<Priori
private final long creationDate;
private final LongSupplier relativeTimeProvider;
public static PrioritizedRunnable wrap(Runnable runnable, Priority priority) {
public static WrappedRunnable wrap(Runnable runnable, Priority priority) {
return new Wrapped(runnable, priority);
}
@ -69,7 +69,7 @@ public abstract class PrioritizedRunnable implements Runnable, Comparable<Priori
return priority;
}
static class Wrapped extends PrioritizedRunnable {
static class Wrapped extends PrioritizedRunnable implements WrappedRunnable {
private final Runnable runnable;
@ -82,5 +82,11 @@ public abstract class PrioritizedRunnable implements Runnable, Comparable<Priori
public void run() {
runnable.run();
}
@Override
public Runnable unwrap() {
return runnable;
}
}
}

View File

@ -45,7 +45,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
// The amount the queue size is adjusted by for each calcuation
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;
private final Function<Runnable, Runnable> runnableWrapper;
private final Function<Runnable, WrappedRunnable> runnableWrapper;
private final ResizableBlockingQueue<Runnable> workQueue;
private final int tasksPerFrame;
private final int minQueueSize;
@ -60,7 +60,7 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ResizableBlockingQueue<Runnable> workQueue, int minQueueSize, int maxQueueSize,
Function<Runnable, Runnable> runnableWrapper, final int tasksPerFrame,
Function<Runnable, WrappedRunnable> runnableWrapper, final int tasksPerFrame,
TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
ThreadContext contextHolder) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit,
@ -78,12 +78,18 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
}
@Override
protected void doExecute(final Runnable command) {
// we are submitting a task, it has not yet started running (because super.excute() has not
// been called), but it could be immediately run, or run at a later time. We need the time
// this task entered the queue, which we get by creating a TimedRunnable, which starts the
// clock as soon as it is created.
super.doExecute(this.runnableWrapper.apply(command));
protected Runnable wrapRunnable(Runnable command) {
return super.wrapRunnable(this.runnableWrapper.apply(command));
}
@Override
protected Runnable unwrap(Runnable runnable) {
final Runnable unwrapped = super.unwrap(runnable);
if (unwrapped instanceof WrappedRunnable) {
return ((WrappedRunnable) unwrapped).unwrap();
} else {
return unwrapped;
}
}
/**
@ -146,11 +152,12 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
// total time as a combination of the time in the queue and time spent running the task. We
// only want runnables that did not throw errors though, because they could be fast-failures
// that throw off our timings, so only check when t is null.
assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
final long taskNanos = ((TimedRunnable) r).getTotalNanos();
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
final long taskNanos = timedRunnable.getTotalNanos();
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos();
final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos();
assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;
executionEWMA.addValue(taskExecutionNanos);

View File

@ -18,10 +18,9 @@
*/
package org.elasticsearch.common.util.concurrent;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.io.stream.StreamInput;
@ -32,27 +31,23 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.HttpTransportSettings;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.nio.charset.StandardCharsets;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
/**
@ -352,11 +347,8 @@ public final class ThreadContext implements Closeable, Writeable {
* Unwraps a command that was previously wrapped by {@link #preserveContext(Runnable)}.
*/
public Runnable unwrap(Runnable command) {
if (command instanceof ContextPreservingAbstractRunnable) {
return ((ContextPreservingAbstractRunnable) command).unwrap();
}
if (command instanceof ContextPreservingRunnable) {
return ((ContextPreservingRunnable) command).unwrap();
if (command instanceof WrappedRunnable) {
return ((WrappedRunnable) command).unwrap();
}
return command;
}
@ -642,7 +634,7 @@ public final class ThreadContext implements Closeable, Writeable {
/**
* Wraps a Runnable to preserve the thread context.
*/
private class ContextPreservingRunnable implements Runnable {
private class ContextPreservingRunnable implements WrappedRunnable {
private final Runnable in;
private final ThreadContext.StoredContext ctx;
@ -658,36 +650,6 @@ public final class ThreadContext implements Closeable, Writeable {
ctx.restore();
whileRunning = true;
in.run();
if (in instanceof RunnableFuture) {
/*
* The wrapped runnable arose from asynchronous submission of a task to an executor. If an uncaught exception was thrown
* during the execution of this task, we need to inspect this runnable and see if it is an error that should be
* propagated to the uncaught exception handler.
*/
try {
((RunnableFuture) in).get();
} catch (final Exception e) {
/*
* In theory, Future#get can only throw a cancellation exception, an interrupted exception, or an execution
* exception. We want to ignore cancellation exceptions, restore the interrupt status on interrupted exceptions, and
* inspect the cause of an execution. We are going to be extra paranoid here though and completely unwrap the
* exception to ensure that there is not a buried error anywhere. We assume that a general exception has been
* handled by the executed task or the task submitter.
*/
assert e instanceof CancellationException
|| e instanceof InterruptedException
|| e instanceof ExecutionException : e;
final Optional<Error> maybeError = ExceptionsHelper.maybeError(e, logger);
if (maybeError.isPresent()) {
// throw this error where it will propagate to the uncaught exception handler
throw maybeError.get();
}
if (e instanceof InterruptedException) {
// restore the interrupt status
Thread.currentThread().interrupt();
}
}
}
whileRunning = false;
} catch (IllegalStateException ex) {
if (whileRunning || threadLocal.closed.get() == false) {
@ -704,6 +666,7 @@ public final class ThreadContext implements Closeable, Writeable {
return in.toString();
}
@Override
public Runnable unwrap() {
return in;
}
@ -712,7 +675,7 @@ public final class ThreadContext implements Closeable, Writeable {
/**
* Wraps an AbstractRunnable to preserve the thread context.
*/
private class ContextPreservingAbstractRunnable extends AbstractRunnable {
private class ContextPreservingAbstractRunnable extends AbstractRunnable implements WrappedRunnable {
private final AbstractRunnable in;
private final ThreadContext.StoredContext creatorsContext;
@ -773,6 +736,7 @@ public final class ThreadContext implements Closeable, Writeable {
return in.toString();
}
@Override
public AbstractRunnable unwrap() {
return in;
}

View File

@ -23,7 +23,7 @@ package org.elasticsearch.common.util.concurrent;
* A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation
* through execution as well as only execution time.
*/
class TimedRunnable extends AbstractRunnable {
class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
private final Runnable original;
private final long creationTimeNanos;
private long startTimeNanos;
@ -94,4 +94,9 @@ class TimedRunnable extends AbstractRunnable {
return Math.max(finishTimeNanos - startTimeNanos, 1);
}
@Override
public Runnable unwrap() {
return original;
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
public interface WrappedRunnable extends Runnable {
Runnable unwrap();
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.threadpool;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -26,8 +27,10 @@ import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -37,7 +40,7 @@ import java.util.function.Consumer;
public interface Scheduler {
static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1,
EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
@ -206,4 +209,30 @@ public interface Scheduler {
}
}
}
/**
* This subclass ensures to properly bubble up Throwable instances of type Error.
*/
class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
public SafeScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, threadFactory, handler);
}
@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
public SafeScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
public SafeScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
EsExecutors.rethrowErrors(r);
}
}
}

View File

@ -226,19 +226,13 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
context.close();
}
private Function<Runnable, Runnable> randomBetweenLimitsWrapper(final int minNs, final int maxNs) {
return (runnable) -> {
return new SettableTimedRunnable(randomIntBetween(minNs, maxNs));
};
}
private Function<Runnable, Runnable> fastWrapper() {
private Function<Runnable, WrappedRunnable> fastWrapper() {
return (runnable) -> {
return new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100));
};
}
private Function<Runnable, Runnable> slowWrapper() {
private Function<Runnable, WrappedRunnable> slowWrapper() {
return (runnable) -> {
return new SettableTimedRunnable(TimeUnit.MINUTES.toNanos(2));
};

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.junit.After;
import org.mockito.ArgumentCaptor;
@ -68,7 +69,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
private final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
private final ScheduledThreadPoolExecutor scheduler =
new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(Settings.EMPTY, "scheduler"));
new Scheduler.SafeScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(Settings.EMPTY, "scheduler"));
@After
public void shutdownScheduler() {

View File

@ -12,12 +12,14 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -758,7 +760,7 @@ public class AutodetectProcessManager {
* operations are initially added to a queue and a worker thread from ml autodetect threadpool will process each
* operation at a time.
*/
class AutodetectWorkerExecutorService extends AbstractExecutorService {
static class AutodetectWorkerExecutorService extends AbstractExecutorService {
private final ThreadContext contextHolder;
private final CountDownLatch awaitTermination = new CountDownLatch(1);
@ -766,6 +768,7 @@ public class AutodetectProcessManager {
private volatile boolean running = true;
@SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
AutodetectWorkerExecutorService(ThreadContext contextHolder) {
this.contextHolder = contextHolder;
}
@ -813,6 +816,7 @@ public class AutodetectProcessManager {
} catch (Exception e) {
logger.error("error handling job operation", e);
}
EsExecutors.rethrowErrors(contextHolder.unwrap(runnable));
}
}
} catch (InterruptedException e) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.filestructurefinder;
import com.ibm.icu.text.CharsetMatch;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FileStructure;
import org.junit.After;
import org.junit.Before;
@ -21,7 +22,6 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.ml.filestructurefinder.FileStructureOverrides.EMPTY_OVERRIDES;
@ -36,7 +36,7 @@ public class FileStructureFinderManagerTests extends FileStructureTestCase {
@Before
public void setup() {
scheduler = new ScheduledThreadPoolExecutor(1);
scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1);
structureFinderManager = new FileStructureFinderManager(scheduler);
}

View File

@ -8,11 +8,11 @@ package org.elasticsearch.xpack.ml.filestructurefinder;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.threadpool.Scheduler;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
public class TimeoutCheckerTests extends FileStructureTestCase {
@ -20,7 +20,7 @@ public class TimeoutCheckerTests extends FileStructureTestCase {
@Before
public void createScheduler() {
scheduler = new ScheduledThreadPoolExecutor(1);
scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1);
}
@After

View File

@ -20,6 +20,7 @@ import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
@ -41,6 +42,7 @@ import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.AutodetectWorkerExecutorService;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
@ -78,6 +80,7 @@ import static org.elasticsearch.mock.orig.Mockito.times;
import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -650,6 +653,26 @@ public class AutodetectProcessManagerTests extends ESTestCase {
verifyNoMoreInteractions(auditor);
}
public void testAutodetectWorkerExecutorServiceDoesNotSwallowErrors() {
final ThreadPool threadPool = new TestThreadPool("testAutodetectWorkerExecutorServiceDoesNotSwallowErrors");
try {
final AutodetectWorkerExecutorService executor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
if (randomBoolean()) {
executor.submit(() -> {
throw new Error("future error");
});
} else {
executor.execute(() -> {
throw new Error("future error");
});
}
final Error e = expectThrows(Error.class, () -> executor.start());
assertThat(e.getMessage(), containsString("future error"));
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
private AutodetectProcessManager createNonSpyManager(String jobId) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
@ -81,7 +82,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
@Before
public void setUpMocks() {
executor = new ScheduledThreadPoolExecutor(1);
executor = new Scheduler.SafeScheduledThreadPoolExecutor(1);
client = mock(Client.class);
threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);