From ed7d84ca5f74ad63bf57e9a86e2fd63a202cd7e9 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 29 Jul 2015 12:02:46 -0400 Subject: [PATCH] Core: Improve toString on EsThreadPoolExecutor Improving the toString allows for nicer error reporting. Also cleaned up the way that EsRejectedExecutionException notices that it was rejected from a shutdown thread pool. I left javadocs about how its not 100% correct but good enough for most uses. The improved toString on EsThreadPoolExecutor mean every one of them needs a name. In most cases the name to use is obvious. In tests I use the name of the test method and in real thread pools I use the name of the thread pool. In non-ThreadPool executors I use the thread's name. Closes #9732 --- .../service/InternalClusterService.java | 2 +- .../common/util/concurrent/EsAbortPolicy.java | 13 +-- .../common/util/concurrent/EsExecutors.java | 16 ++-- .../EsRejectedExecutionException.java | 30 ++++++- .../util/concurrent/EsThreadPoolExecutor.java | 38 ++++++++- .../PrioritizedEsThreadPoolExecutor.java | 4 +- .../zen/ping/unicast/UnicastZenPing.java | 2 +- .../indices/recovery/RecoverySettings.java | 6 +- .../elasticsearch/threadpool/ThreadPool.java | 6 +- .../transport/local/LocalTransport.java | 2 +- .../util/concurrent/EsExecutorsTests.java | 83 ++++++++++++++++++- .../concurrent/PrioritizedExecutorsTests.java | 12 +-- .../org/elasticsearch/test/ESTestCase.java | 2 +- .../test/InternalTestCluster.java | 2 +- 14 files changed, 172 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 456d6dc0e65..70e993274ff 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -154,7 +154,7 @@ public class InternalClusterService extends AbstractLifecycleComponent nodeAttributes = discoveryNodeService.buildAttributes(); // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java index 8bb16869c47..2b19fa2096c 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java @@ -27,9 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor; /** */ public class EsAbortPolicy implements XRejectedExecutionHandler { - private final CounterMetric rejected = new CounterMetric(); - public static final String SHUTTING_DOWN_KEY = "(shutting down)"; @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { @@ -49,16 +47,7 @@ public class EsAbortPolicy implements XRejectedExecutionHandler { } } rejected.inc(); - StringBuilder sb = new StringBuilder("rejected execution "); - if (executor.isShutdown()) { - sb.append(SHUTTING_DOWN_KEY + " "); - } else { - if (executor.getQueue() instanceof SizeBlockingQueue) { - sb.append("(queue capacity ").append(((SizeBlockingQueue) executor.getQueue()).capacity()).append(") "); - } - } - sb.append("on ").append(r.toString()); - throw new EsRejectedExecutionException(sb.toString()); + throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown()); } @Override diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 92e8d7d095f..c7cc07c3d45 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -54,30 +54,30 @@ public class EsExecutors { return settings.getAsInt(PROCESSORS, defaultValue); } - public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) { - return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory); + public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory) { + return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory); } - public static EsThreadPoolExecutor newScaling(int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { ExecutorScalingQueue queue = new ExecutorScalingQueue<>(); // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue - EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy()); + EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy()); queue.executor = executor; return executor; } - public static EsThreadPoolExecutor newCached(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new EsAbortPolicy()); + public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new EsAbortPolicy()); } - public static EsThreadPoolExecutor newFixed(int size, int queueCapacity, ThreadFactory threadFactory) { + public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory) { BlockingQueue queue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), queueCapacity); } - return new EsThreadPoolExecutor(size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy()); + return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy()); } public static String threadName(Settings settings, String ... names) { diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java index 2aec22c04ec..d75b3ffa8c2 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -28,17 +29,25 @@ import java.io.IOException; /** */ public class EsRejectedExecutionException extends ElasticsearchException { + private final boolean isExecutorShutdown; + + public EsRejectedExecutionException(String message, boolean isExecutorShutdown) { + super(message); + this.isExecutorShutdown = isExecutorShutdown; + } public EsRejectedExecutionException(String message) { - super(message); + this(message, false); } public EsRejectedExecutionException() { super((String)null); + this.isExecutorShutdown = false; } public EsRejectedExecutionException(Throwable e) { super(null, e); + this.isExecutorShutdown = false; } @Override @@ -48,5 +57,24 @@ public class EsRejectedExecutionException extends ElasticsearchException { public EsRejectedExecutionException(StreamInput in) throws IOException{ super(in); + isExecutorShutdown = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(isExecutorShutdown); + } + + /** + * Checks if the thread pool that rejected the execution was terminated + * shortly after the rejection. Its possible that this returns false and the + * thread pool has since been terminated but if this returns false then the + * termination wasn't a factor in this rejection. Conversely if this returns + * true the shutdown was probably a factor in this rejection but might have + * been triggered just after the action rejection. + */ + public boolean isExecutorShutdown() { + return isExecutorShutdown; } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 9cfb6875993..1e6743e9004 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -33,13 +33,18 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private volatile ShutdownListener listener; private final Object monitor = new Object(); + /** + * Name used in error reporting. + */ + private final String name; - EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy()); + EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy()); } - EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) { + EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + this.name = name; } public void shutdown(ShutdownListener listener) { @@ -93,4 +98,31 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { } } } + + @Override + public String toString() { + /* + * ThreadPoolExecutor has some nice information in its toString but we + * can't recreate it without nastier hacks than this. + */ + String tpeToString = super.toString(); + int startOfInfoInTpeToString = tpeToString.indexOf('['); + String tpeInfo; + if (startOfInfoInTpeToString >= 0) { + tpeInfo = tpeToString.substring(startOfInfoInTpeToString + 1); + } else { + assert false: "Unsupported ThreadPoolExecutor toString"; + tpeInfo = tpeToString; + } + StringBuilder b = new StringBuilder(); + b.append(getClass().getSimpleName()).append('['); + b.append(name).append(", "); + if (getQueue() instanceof SizeBlockingQueue) { + @SuppressWarnings("rawtypes") + SizeBlockingQueue queue = (SizeBlockingQueue) getQueue(); + b.append("queue capacity = ").append(queue.capacity()).append(", "); + } + b.append("state = ").append(tpeInfo); + return b.toString(); + } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index 65998b57cc7..38c0cb23234 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -41,8 +41,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { private AtomicLong insertionOrder = new AtomicLong(); private Queue current = ConcurrentCollections.newQueue(); - PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(), threadFactory); + PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(), threadFactory); } public Pending[] getPending() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index f9cf98f86fa..b16b616515c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -136,7 +136,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest.class, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); - unicastConnectExecutor = EsExecutors.newScaling(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); + unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 72d7c77e881..c3d167082ff 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -125,9 +125,11 @@ public class RecoverySettings extends AbstractComponent implements Closeable { this.concurrentStreams = settings.getAsInt("indices.recovery.concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3)); - this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); + this.concurrentStreamPool = EsExecutors.newScaling("recovery_stream", 0, concurrentStreams, 60, TimeUnit.SECONDS, + EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); this.concurrentSmallFileStreams = settings.getAsInt("indices.recovery.concurrent_small_file_streams", settings.getAsInt("index.shard.recovery.concurrent_small_file_streams", 2)); - this.concurrentSmallFileStreamPool = EsExecutors.newScaling(0, concurrentSmallFileStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]")); + this.concurrentSmallFileStreamPool = EsExecutors.newScaling("small_file_recovery_stream", 0, concurrentSmallFileStreams, 60, + TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]")); this.maxBytesPerSec = settings.getAsBytesSize("indices.recovery.max_bytes_per_sec", settings.getAsBytesSize("indices.recovery.max_size_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB))); if (maxBytesPerSec.bytes() <= 0) { diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index f6e359baafb..7c01367c016 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -336,7 +336,7 @@ public class ThreadPool extends AbstractComponent { } else { logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); } - Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); } else if ("fixed".equals(type)) { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); @@ -371,7 +371,7 @@ public class ThreadPool extends AbstractComponent { int size = settings.getAsInt("size", defaultSize); SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); - Executor executor = EsExecutors.newFixed(size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); + Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize)); } else if ("scaling".equals(type)) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); @@ -415,7 +415,7 @@ public class ThreadPool extends AbstractComponent { } else { logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } - Executor executor = EsExecutors.newScaling(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); } throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index cc7a8fd81a8..2cd4168b28d 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -80,7 +80,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1); logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX); - this.workers = EsExecutors.newFixed(workerCount, queueSize, threadFactory); + this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory); } @Override diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 7c0891fac6e..7c9355e4191 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.test.ESTestCase; import org.junit.Test; @@ -27,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -40,7 +42,7 @@ public class EsExecutorsTests extends ESTestCase { } public void testFixedForcedExecution() throws Exception { - EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); + EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1); @@ -102,7 +104,7 @@ public class EsExecutorsTests extends ESTestCase { } public void testFixedRejected() throws Exception { - EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); + EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1); @@ -160,7 +162,7 @@ public class EsExecutorsTests extends ESTestCase { final int max = between(min + 1, 6); final ThreadBarrier barrier = new ThreadBarrier(max + 1); - ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test")); + ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test")); assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); @@ -196,7 +198,7 @@ public class EsExecutorsTests extends ESTestCase { final int max = between(min + 1, 6); final ThreadBarrier barrier = new ThreadBarrier(max + 1); - final ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test")); + final ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test")); assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); @@ -233,4 +235,77 @@ public class EsExecutorsTests extends ESTestCase { }); terminate(pool); } + + public void testRejectionMessageAndShuttingDownFlag() throws InterruptedException { + int pool = between(1, 10); + int queue = between(0, 100); + int actions = queue + pool; + final CountDownLatch latch = new CountDownLatch(1); + EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy")); + try { + for (int i = 0; i < actions; i++) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + try { + executor.execute(new Runnable() { + @Override + public void run() { + // Doesn't matter is going to be rejected + } + + @Override + public String toString() { + return "dummy runnable"; + } + }); + fail("Didn't get a rejection when we expected one."); + } catch (EsRejectedExecutionException e) { + assertFalse("Thread pool registering as terminated when it isn't", e.isExecutorShutdown()); + String message = ExceptionsHelper.detailedMessage(e); + assertThat(message, containsString("of dummy runnable")); + assertThat(message, containsString("on EsThreadPoolExecutor[testRejectionMessage")); + assertThat(message, containsString("queue capacity = " + queue)); + assertThat(message, containsString("state = Running")); + assertThat(message, containsString("active threads = " + pool)); + assertThat(message, containsString("queued tasks = " + queue)); + assertThat(message, containsString("completed tasks = 0")); + } + } finally { + latch.countDown(); + terminate(executor); + } + try { + executor.execute(new Runnable() { + @Override + public void run() { + // Doesn't matter is going to be rejected + } + + @Override + public String toString() { + return "dummy runnable"; + } + }); + fail("Didn't get a rejection when we expected one."); + } catch (EsRejectedExecutionException e) { + assertTrue("Thread pool not registering as terminated when it is", e.isExecutorShutdown()); + String message = ExceptionsHelper.detailedMessage(e); + assertThat(message, containsString("of dummy runnable")); + assertThat(message, containsString("on EsThreadPoolExecutor[" + getTestName())); + assertThat(message, containsString("queue capacity = " + queue)); + assertThat(message, containsString("state = Terminated")); + assertThat(message, containsString("active threads = 0")); + assertThat(message, containsString("queued tasks = 0")); + assertThat(message, containsString("completed tasks = " + actions)); + } + } } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java index 0620f2f91be..ef1c0a9d7ec 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -61,7 +61,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testSubmitPrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -91,7 +91,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testExecutePrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -121,7 +121,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testSubmitPrioritizedExecutorWithCallables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -151,7 +151,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testSubmitPrioritizedExecutorWithMixed() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -182,7 +182,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testTimeout() throws Exception { ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName())); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); final CountDownLatch invoked = new CountDownLatch(1); final CountDownLatch block = new CountDownLatch(1); executor.execute(new Runnable() { @@ -246,7 +246,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { ThreadPool threadPool = new ThreadPool("test"); final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler(); final AtomicBoolean timeoutCalled = new AtomicBoolean(); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); final CountDownLatch invoked = new CountDownLatch(1); executor.execute(new Runnable() { @Override diff --git a/core/src/test/java/org/elasticsearch/test/ESTestCase.java b/core/src/test/java/org/elasticsearch/test/ESTestCase.java index 462a98b03e4..1694ecf2e4f 100644 --- a/core/src/test/java/org/elasticsearch/test/ESTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ESTestCase.java @@ -537,7 +537,7 @@ public abstract class ESTestCase extends LuceneTestCase { @Override public void uncaughtException(Thread t, Throwable e) { if (e instanceof EsRejectedExecutionException) { - if (e.getMessage() != null && e.getMessage().contains(EsAbortPolicy.SHUTTING_DOWN_KEY)) { + if (e.getMessage() != null && ((EsRejectedExecutionException) e).isExecutorShutdown()) { return; // ignore the EsRejectedExecutionException when a node shuts down } } else if (e instanceof OutOfMemoryError) { diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index ee3071cce66..19fe03ce988 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -314,7 +314,7 @@ public final class InternalTestCluster extends TestCluster { // always reduce this - it can make tests really slow builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50))); defaultSettings = builder.build(); - executor = EsExecutors.newCached(0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName)); + executor = EsExecutors.newCached("test runner", 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName)); } public static String nodeMode() {