From d46fdb1638b9ac8cf4a0f2b88797e825a0bde6d1 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 29 Jul 2015 10:05:12 -0400 Subject: [PATCH 1/3] [Tests] Cleanup EsExecutorsTests * names prefixed with test don't need @Test * Javadoc describing what it tests --- .../common/util/concurrent/EsExecutorsTests.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index eabac31974c..7c0891fac6e 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -31,6 +31,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; /** + * Tests for EsExecutors and its components like EsAbortPolicy. */ public class EsExecutorsTests extends ESTestCase { @@ -38,7 +39,6 @@ public class EsExecutorsTests extends ESTestCase { return TimeUnit.values()[between(0, TimeUnit.values().length - 1)]; } - @Test public void testFixedForcedExecution() throws Exception { EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); @@ -101,7 +101,6 @@ public class EsExecutorsTests extends ESTestCase { executor.shutdownNow(); } - @Test public void testFixedRejected() throws Exception { EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); @@ -156,7 +155,6 @@ public class EsExecutorsTests extends ESTestCase { terminate(executor); } - @Test public void testScaleUp() throws Exception { final int min = between(1, 3); final int max = between(min + 1, 6); @@ -193,7 +191,6 @@ public class EsExecutorsTests extends ESTestCase { terminate(pool); } - @Test public void testScaleDown() throws Exception { final int min = between(1, 3); final int max = between(min + 1, 6); From ed7d84ca5f74ad63bf57e9a86e2fd63a202cd7e9 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 29 Jul 2015 12:02:46 -0400 Subject: [PATCH 2/3] Core: Improve toString on EsThreadPoolExecutor Improving the toString allows for nicer error reporting. Also cleaned up the way that EsRejectedExecutionException notices that it was rejected from a shutdown thread pool. I left javadocs about how its not 100% correct but good enough for most uses. The improved toString on EsThreadPoolExecutor mean every one of them needs a name. In most cases the name to use is obvious. In tests I use the name of the test method and in real thread pools I use the name of the thread pool. In non-ThreadPool executors I use the thread's name. Closes #9732 --- .../service/InternalClusterService.java | 2 +- .../common/util/concurrent/EsAbortPolicy.java | 13 +-- .../common/util/concurrent/EsExecutors.java | 16 ++-- .../EsRejectedExecutionException.java | 30 ++++++- .../util/concurrent/EsThreadPoolExecutor.java | 38 ++++++++- .../PrioritizedEsThreadPoolExecutor.java | 4 +- .../zen/ping/unicast/UnicastZenPing.java | 2 +- .../indices/recovery/RecoverySettings.java | 6 +- .../elasticsearch/threadpool/ThreadPool.java | 6 +- .../transport/local/LocalTransport.java | 2 +- .../util/concurrent/EsExecutorsTests.java | 83 ++++++++++++++++++- .../concurrent/PrioritizedExecutorsTests.java | 12 +-- .../org/elasticsearch/test/ESTestCase.java | 2 +- .../test/InternalTestCluster.java | 2 +- 14 files changed, 172 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 456d6dc0e65..70e993274ff 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -154,7 +154,7 @@ public class InternalClusterService extends AbstractLifecycleComponent nodeAttributes = discoveryNodeService.buildAttributes(); // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java index 8bb16869c47..2b19fa2096c 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java @@ -27,9 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor; /** */ public class EsAbortPolicy implements XRejectedExecutionHandler { - private final CounterMetric rejected = new CounterMetric(); - public static final String SHUTTING_DOWN_KEY = "(shutting down)"; @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { @@ -49,16 +47,7 @@ public class EsAbortPolicy implements XRejectedExecutionHandler { } } rejected.inc(); - StringBuilder sb = new StringBuilder("rejected execution "); - if (executor.isShutdown()) { - sb.append(SHUTTING_DOWN_KEY + " "); - } else { - if (executor.getQueue() instanceof SizeBlockingQueue) { - sb.append("(queue capacity ").append(((SizeBlockingQueue) executor.getQueue()).capacity()).append(") "); - } - } - sb.append("on ").append(r.toString()); - throw new EsRejectedExecutionException(sb.toString()); + throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown()); } @Override diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 92e8d7d095f..c7cc07c3d45 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -54,30 +54,30 @@ public class EsExecutors { return settings.getAsInt(PROCESSORS, defaultValue); } - public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) { - return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory); + public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory) { + return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory); } - public static EsThreadPoolExecutor newScaling(int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { ExecutorScalingQueue queue = new ExecutorScalingQueue<>(); // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue - EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy()); + EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy()); queue.executor = executor; return executor; } - public static EsThreadPoolExecutor newCached(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new EsAbortPolicy()); + public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new EsAbortPolicy()); } - public static EsThreadPoolExecutor newFixed(int size, int queueCapacity, ThreadFactory threadFactory) { + public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory) { BlockingQueue queue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), queueCapacity); } - return new EsThreadPoolExecutor(size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy()); + return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy()); } public static String threadName(Settings settings, String ... names) { diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java index 2aec22c04ec..d75b3ffa8c2 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -28,17 +29,25 @@ import java.io.IOException; /** */ public class EsRejectedExecutionException extends ElasticsearchException { + private final boolean isExecutorShutdown; + + public EsRejectedExecutionException(String message, boolean isExecutorShutdown) { + super(message); + this.isExecutorShutdown = isExecutorShutdown; + } public EsRejectedExecutionException(String message) { - super(message); + this(message, false); } public EsRejectedExecutionException() { super((String)null); + this.isExecutorShutdown = false; } public EsRejectedExecutionException(Throwable e) { super(null, e); + this.isExecutorShutdown = false; } @Override @@ -48,5 +57,24 @@ public class EsRejectedExecutionException extends ElasticsearchException { public EsRejectedExecutionException(StreamInput in) throws IOException{ super(in); + isExecutorShutdown = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(isExecutorShutdown); + } + + /** + * Checks if the thread pool that rejected the execution was terminated + * shortly after the rejection. Its possible that this returns false and the + * thread pool has since been terminated but if this returns false then the + * termination wasn't a factor in this rejection. Conversely if this returns + * true the shutdown was probably a factor in this rejection but might have + * been triggered just after the action rejection. + */ + public boolean isExecutorShutdown() { + return isExecutorShutdown; } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 9cfb6875993..1e6743e9004 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -33,13 +33,18 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private volatile ShutdownListener listener; private final Object monitor = new Object(); + /** + * Name used in error reporting. + */ + private final String name; - EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy()); + EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { + this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy()); } - EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) { + EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + this.name = name; } public void shutdown(ShutdownListener listener) { @@ -93,4 +98,31 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { } } } + + @Override + public String toString() { + /* + * ThreadPoolExecutor has some nice information in its toString but we + * can't recreate it without nastier hacks than this. + */ + String tpeToString = super.toString(); + int startOfInfoInTpeToString = tpeToString.indexOf('['); + String tpeInfo; + if (startOfInfoInTpeToString >= 0) { + tpeInfo = tpeToString.substring(startOfInfoInTpeToString + 1); + } else { + assert false: "Unsupported ThreadPoolExecutor toString"; + tpeInfo = tpeToString; + } + StringBuilder b = new StringBuilder(); + b.append(getClass().getSimpleName()).append('['); + b.append(name).append(", "); + if (getQueue() instanceof SizeBlockingQueue) { + @SuppressWarnings("rawtypes") + SizeBlockingQueue queue = (SizeBlockingQueue) getQueue(); + b.append("queue capacity = ").append(queue.capacity()).append(", "); + } + b.append("state = ").append(tpeInfo); + return b.toString(); + } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index 65998b57cc7..38c0cb23234 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -41,8 +41,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { private AtomicLong insertionOrder = new AtomicLong(); private Queue current = ConcurrentCollections.newQueue(); - PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(), threadFactory); + PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(), threadFactory); } public Pending[] getPending() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index f9cf98f86fa..b16b616515c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -136,7 +136,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest.class, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); - unicastConnectExecutor = EsExecutors.newScaling(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); + unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 72d7c77e881..c3d167082ff 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -125,9 +125,11 @@ public class RecoverySettings extends AbstractComponent implements Closeable { this.concurrentStreams = settings.getAsInt("indices.recovery.concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3)); - this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); + this.concurrentStreamPool = EsExecutors.newScaling("recovery_stream", 0, concurrentStreams, 60, TimeUnit.SECONDS, + EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); this.concurrentSmallFileStreams = settings.getAsInt("indices.recovery.concurrent_small_file_streams", settings.getAsInt("index.shard.recovery.concurrent_small_file_streams", 2)); - this.concurrentSmallFileStreamPool = EsExecutors.newScaling(0, concurrentSmallFileStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]")); + this.concurrentSmallFileStreamPool = EsExecutors.newScaling("small_file_recovery_stream", 0, concurrentSmallFileStreams, 60, + TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]")); this.maxBytesPerSec = settings.getAsBytesSize("indices.recovery.max_bytes_per_sec", settings.getAsBytesSize("indices.recovery.max_size_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB))); if (maxBytesPerSec.bytes() <= 0) { diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index f6e359baafb..7c01367c016 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -336,7 +336,7 @@ public class ThreadPool extends AbstractComponent { } else { logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); } - Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); } else if ("fixed".equals(type)) { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); @@ -371,7 +371,7 @@ public class ThreadPool extends AbstractComponent { int size = settings.getAsInt("size", defaultSize); SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); - Executor executor = EsExecutors.newFixed(size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); + Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize)); } else if ("scaling".equals(type)) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); @@ -415,7 +415,7 @@ public class ThreadPool extends AbstractComponent { } else { logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } - Executor executor = EsExecutors.newScaling(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); + Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); } throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index cc7a8fd81a8..2cd4168b28d 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -80,7 +80,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1); logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX); - this.workers = EsExecutors.newFixed(workerCount, queueSize, threadFactory); + this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory); } @Override diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 7c0891fac6e..7c9355e4191 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.test.ESTestCase; import org.junit.Test; @@ -27,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; @@ -40,7 +42,7 @@ public class EsExecutorsTests extends ESTestCase { } public void testFixedForcedExecution() throws Exception { - EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); + EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1); @@ -102,7 +104,7 @@ public class EsExecutorsTests extends ESTestCase { } public void testFixedRejected() throws Exception { - EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); + EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test")); final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1); @@ -160,7 +162,7 @@ public class EsExecutorsTests extends ESTestCase { final int max = between(min + 1, 6); final ThreadBarrier barrier = new ThreadBarrier(max + 1); - ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test")); + ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test")); assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); @@ -196,7 +198,7 @@ public class EsExecutorsTests extends ESTestCase { final int max = between(min + 1, 6); final ThreadBarrier barrier = new ThreadBarrier(max + 1); - final ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test")); + final ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test")); assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); @@ -233,4 +235,77 @@ public class EsExecutorsTests extends ESTestCase { }); terminate(pool); } + + public void testRejectionMessageAndShuttingDownFlag() throws InterruptedException { + int pool = between(1, 10); + int queue = between(0, 100); + int actions = queue + pool; + final CountDownLatch latch = new CountDownLatch(1); + EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy")); + try { + for (int i = 0; i < actions; i++) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + try { + executor.execute(new Runnable() { + @Override + public void run() { + // Doesn't matter is going to be rejected + } + + @Override + public String toString() { + return "dummy runnable"; + } + }); + fail("Didn't get a rejection when we expected one."); + } catch (EsRejectedExecutionException e) { + assertFalse("Thread pool registering as terminated when it isn't", e.isExecutorShutdown()); + String message = ExceptionsHelper.detailedMessage(e); + assertThat(message, containsString("of dummy runnable")); + assertThat(message, containsString("on EsThreadPoolExecutor[testRejectionMessage")); + assertThat(message, containsString("queue capacity = " + queue)); + assertThat(message, containsString("state = Running")); + assertThat(message, containsString("active threads = " + pool)); + assertThat(message, containsString("queued tasks = " + queue)); + assertThat(message, containsString("completed tasks = 0")); + } + } finally { + latch.countDown(); + terminate(executor); + } + try { + executor.execute(new Runnable() { + @Override + public void run() { + // Doesn't matter is going to be rejected + } + + @Override + public String toString() { + return "dummy runnable"; + } + }); + fail("Didn't get a rejection when we expected one."); + } catch (EsRejectedExecutionException e) { + assertTrue("Thread pool not registering as terminated when it is", e.isExecutorShutdown()); + String message = ExceptionsHelper.detailedMessage(e); + assertThat(message, containsString("of dummy runnable")); + assertThat(message, containsString("on EsThreadPoolExecutor[" + getTestName())); + assertThat(message, containsString("queue capacity = " + queue)); + assertThat(message, containsString("state = Terminated")); + assertThat(message, containsString("active threads = 0")); + assertThat(message, containsString("queued tasks = 0")); + assertThat(message, containsString("completed tasks = " + actions)); + } + } } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java index 0620f2f91be..ef1c0a9d7ec 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -61,7 +61,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testSubmitPrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -91,7 +91,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testExecutePrioritizedExecutorWithRunnables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -121,7 +121,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testSubmitPrioritizedExecutorWithCallables() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -151,7 +151,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testSubmitPrioritizedExecutorWithMixed() throws Exception { - ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); List results = new ArrayList<>(8); CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch finishedLatch = new CountDownLatch(8); @@ -182,7 +182,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { @Test public void testTimeout() throws Exception { ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName())); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); final CountDownLatch invoked = new CountDownLatch(1); final CountDownLatch block = new CountDownLatch(1); executor.execute(new Runnable() { @@ -246,7 +246,7 @@ public class PrioritizedExecutorsTests extends ESTestCase { ThreadPool threadPool = new ThreadPool("test"); final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler(); final AtomicBoolean timeoutCalled = new AtomicBoolean(); - PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName())); final CountDownLatch invoked = new CountDownLatch(1); executor.execute(new Runnable() { @Override diff --git a/core/src/test/java/org/elasticsearch/test/ESTestCase.java b/core/src/test/java/org/elasticsearch/test/ESTestCase.java index 462a98b03e4..1694ecf2e4f 100644 --- a/core/src/test/java/org/elasticsearch/test/ESTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ESTestCase.java @@ -537,7 +537,7 @@ public abstract class ESTestCase extends LuceneTestCase { @Override public void uncaughtException(Thread t, Throwable e) { if (e instanceof EsRejectedExecutionException) { - if (e.getMessage() != null && e.getMessage().contains(EsAbortPolicy.SHUTTING_DOWN_KEY)) { + if (e.getMessage() != null && ((EsRejectedExecutionException) e).isExecutorShutdown()) { return; // ignore the EsRejectedExecutionException when a node shuts down } } else if (e instanceof OutOfMemoryError) { diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index ee3071cce66..19fe03ce988 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -314,7 +314,7 @@ public final class InternalTestCluster extends TestCluster { // always reduce this - it can make tests really slow builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50))); defaultSettings = builder.build(); - executor = EsExecutors.newCached(0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName)); + executor = EsExecutors.newCached("test runner", 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName)); } public static String nodeMode() { From 804f14c68eced5b6a827753dcb9b1bf5cb1f44cf Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 30 Jul 2015 13:52:58 -0400 Subject: [PATCH 3/3] Core: Remove nasty hack in toString This makes the output of EsThreadPoolExecutor#toString less pretty but we no longer have funky hacky that rely on the specific format of the toString produced by ThreadPoolExecutor which isn't part of its API and could change with any JVM version and break the output. --- .../util/concurrent/EsThreadPoolExecutor.java | 19 +++++-------------- .../util/concurrent/EsExecutorsTests.java | 4 ++-- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 1e6743e9004..4c02aab1fe8 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -101,19 +101,6 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { @Override public String toString() { - /* - * ThreadPoolExecutor has some nice information in its toString but we - * can't recreate it without nastier hacks than this. - */ - String tpeToString = super.toString(); - int startOfInfoInTpeToString = tpeToString.indexOf('['); - String tpeInfo; - if (startOfInfoInTpeToString >= 0) { - tpeInfo = tpeToString.substring(startOfInfoInTpeToString + 1); - } else { - assert false: "Unsupported ThreadPoolExecutor toString"; - tpeInfo = tpeToString; - } StringBuilder b = new StringBuilder(); b.append(getClass().getSimpleName()).append('['); b.append(name).append(", "); @@ -122,7 +109,11 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { SizeBlockingQueue queue = (SizeBlockingQueue) getQueue(); b.append("queue capacity = ").append(queue.capacity()).append(", "); } - b.append("state = ").append(tpeInfo); + /* + * ThreadPoolExecutor has some nice information in its toString but we + * can't get at it easily without just getting the toString. + */ + b.append(super.toString()).append(']'); return b.toString(); } } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 7c9355e4191..c7406aa9511 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -274,7 +274,7 @@ public class EsExecutorsTests extends ESTestCase { assertThat(message, containsString("of dummy runnable")); assertThat(message, containsString("on EsThreadPoolExecutor[testRejectionMessage")); assertThat(message, containsString("queue capacity = " + queue)); - assertThat(message, containsString("state = Running")); + assertThat(message, containsString("[Running")); assertThat(message, containsString("active threads = " + pool)); assertThat(message, containsString("queued tasks = " + queue)); assertThat(message, containsString("completed tasks = 0")); @@ -302,7 +302,7 @@ public class EsExecutorsTests extends ESTestCase { assertThat(message, containsString("of dummy runnable")); assertThat(message, containsString("on EsThreadPoolExecutor[" + getTestName())); assertThat(message, containsString("queue capacity = " + queue)); - assertThat(message, containsString("state = Terminated")); + assertThat(message, containsString("[Terminated")); assertThat(message, containsString("active threads = 0")); assertThat(message, containsString("queued tasks = 0")); assertThat(message, containsString("completed tasks = " + actions));