Merge pull request #12535 from nik9000/fix/9732

Improve toString on EsThreadPoolExecutor
This commit is contained in:
Nik Everett 2015-08-05 14:29:55 -04:00
commit 065275443d
14 changed files with 164 additions and 50 deletions

View File

@ -154,7 +154,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
protected void doStart() { protected void doStart() {
add(localNodeMasterListeners); add(localNodeMasterListeners);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build(); this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(daemonThreadFactory(settings, UPDATE_THREAD_NAME)); this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME));
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes()); this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes(); Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling

View File

@ -27,9 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor;
/** /**
*/ */
public class EsAbortPolicy implements XRejectedExecutionHandler { public class EsAbortPolicy implements XRejectedExecutionHandler {
private final CounterMetric rejected = new CounterMetric(); private final CounterMetric rejected = new CounterMetric();
public static final String SHUTTING_DOWN_KEY = "(shutting down)";
@Override @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
@ -49,16 +47,7 @@ public class EsAbortPolicy implements XRejectedExecutionHandler {
} }
} }
rejected.inc(); rejected.inc();
StringBuilder sb = new StringBuilder("rejected execution "); throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
if (executor.isShutdown()) {
sb.append(SHUTTING_DOWN_KEY + " ");
} else {
if (executor.getQueue() instanceof SizeBlockingQueue) {
sb.append("(queue capacity ").append(((SizeBlockingQueue) executor.getQueue()).capacity()).append(") ");
}
}
sb.append("on ").append(r.toString());
throw new EsRejectedExecutionException(sb.toString());
} }
@Override @Override

View File

@ -54,30 +54,30 @@ public class EsExecutors {
return settings.getAsInt(PROCESSORS, defaultValue); return settings.getAsInt(PROCESSORS, defaultValue);
} }
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) { public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory) {
return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory); return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory);
} }
public static EsThreadPoolExecutor newScaling(int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>(); ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy()); EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy());
queue.executor = executor; queue.executor = executor;
return executor; return executor;
} }
public static EsThreadPoolExecutor newCached(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
return new EsThreadPoolExecutor(0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy()); return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy());
} }
public static EsThreadPoolExecutor newFixed(int size, int queueCapacity, ThreadFactory threadFactory) { public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory) {
BlockingQueue<Runnable> queue; BlockingQueue<Runnable> queue;
if (queueCapacity < 0) { if (queueCapacity < 0) {
queue = ConcurrentCollections.newBlockingQueue(); queue = ConcurrentCollections.newBlockingQueue();
} else { } else {
queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity); queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
} }
return new EsThreadPoolExecutor(size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy()); return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy());
} }
public static String threadName(Settings settings, String ... names) { public static String threadName(Settings settings, String ... names) {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
@ -28,17 +29,25 @@ import java.io.IOException;
/** /**
*/ */
public class EsRejectedExecutionException extends ElasticsearchException { public class EsRejectedExecutionException extends ElasticsearchException {
private final boolean isExecutorShutdown;
public EsRejectedExecutionException(String message, boolean isExecutorShutdown) {
super(message);
this.isExecutorShutdown = isExecutorShutdown;
}
public EsRejectedExecutionException(String message) { public EsRejectedExecutionException(String message) {
super(message); this(message, false);
} }
public EsRejectedExecutionException() { public EsRejectedExecutionException() {
super((String)null); super((String)null);
this.isExecutorShutdown = false;
} }
public EsRejectedExecutionException(Throwable e) { public EsRejectedExecutionException(Throwable e) {
super(null, e); super(null, e);
this.isExecutorShutdown = false;
} }
@Override @Override
@ -48,5 +57,24 @@ public class EsRejectedExecutionException extends ElasticsearchException {
public EsRejectedExecutionException(StreamInput in) throws IOException{ public EsRejectedExecutionException(StreamInput in) throws IOException{
super(in); super(in);
isExecutorShutdown = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(isExecutorShutdown);
}
/**
* Checks if the thread pool that rejected the execution was terminated
* shortly after the rejection. Its possible that this returns false and the
* thread pool has since been terminated but if this returns false then the
* termination wasn't a factor in this rejection. Conversely if this returns
* true the shutdown was probably a factor in this rejection but might have
* been triggered just after the action rejection.
*/
public boolean isExecutorShutdown() {
return isExecutorShutdown;
} }
} }

View File

@ -33,13 +33,18 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
private volatile ShutdownListener listener; private volatile ShutdownListener listener;
private final Object monitor = new Object(); private final Object monitor = new Object();
/**
* Name used in error reporting.
*/
private final String name;
EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy()); this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy());
} }
EsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) { EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.name = name;
} }
public void shutdown(ShutdownListener listener) { public void shutdown(ShutdownListener listener) {
@ -93,4 +98,22 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
} }
} }
} }
@Override
public String toString() {
StringBuilder b = new StringBuilder();
b.append(getClass().getSimpleName()).append('[');
b.append(name).append(", ");
if (getQueue() instanceof SizeBlockingQueue) {
@SuppressWarnings("rawtypes")
SizeBlockingQueue queue = (SizeBlockingQueue) getQueue();
b.append("queue capacity = ").append(queue.capacity()).append(", ");
}
/*
* ThreadPoolExecutor has some nice information in its toString but we
* can't get at it easily without just getting the toString.
*/
b.append(super.toString()).append(']');
return b.toString();
}
} }

View File

@ -41,8 +41,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private AtomicLong insertionOrder = new AtomicLong(); private AtomicLong insertionOrder = new AtomicLong();
private Queue<Runnable> current = ConcurrentCollections.newQueue(); private Queue<Runnable> current = ConcurrentCollections.newQueue();
PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory); super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory);
} }
public Pending[] getPending() { public Pending[] getPending() {

View File

@ -136,7 +136,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest.class, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest.class, ThreadPool.Names.SAME, new UnicastPingRequestHandler());
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastConnectExecutor = EsExecutors.newScaling(0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory); unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory);
} }
@Override @Override

View File

@ -125,9 +125,11 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
this.concurrentStreams = settings.getAsInt("indices.recovery.concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3)); this.concurrentStreams = settings.getAsInt("indices.recovery.concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3));
this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); this.concurrentStreamPool = EsExecutors.newScaling("recovery_stream", 0, concurrentStreams, 60, TimeUnit.SECONDS,
EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.concurrentSmallFileStreams = settings.getAsInt("indices.recovery.concurrent_small_file_streams", settings.getAsInt("index.shard.recovery.concurrent_small_file_streams", 2)); this.concurrentSmallFileStreams = settings.getAsInt("indices.recovery.concurrent_small_file_streams", settings.getAsInt("index.shard.recovery.concurrent_small_file_streams", 2));
this.concurrentSmallFileStreamPool = EsExecutors.newScaling(0, concurrentSmallFileStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]")); this.concurrentSmallFileStreamPool = EsExecutors.newScaling("small_file_recovery_stream", 0, concurrentSmallFileStreams, 60,
TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]"));
this.maxBytesPerSec = settings.getAsBytesSize("indices.recovery.max_bytes_per_sec", settings.getAsBytesSize("indices.recovery.max_size_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB))); this.maxBytesPerSec = settings.getAsBytesSize("indices.recovery.max_bytes_per_sec", settings.getAsBytesSize("indices.recovery.max_size_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)));
if (maxBytesPerSec.bytes() <= 0) { if (maxBytesPerSec.bytes() <= 0) {

View File

@ -336,7 +336,7 @@ public class ThreadPool extends AbstractComponent {
} else { } else {
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
} }
Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) { } else if ("fixed".equals(type)) {
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
@ -371,7 +371,7 @@ public class ThreadPool extends AbstractComponent {
int size = settings.getAsInt("size", defaultSize); int size = settings.getAsInt("size", defaultSize);
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
Executor executor = EsExecutors.newFixed(size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize)); return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize));
} else if ("scaling".equals(type)) { } else if ("scaling".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
@ -415,7 +415,7 @@ public class ThreadPool extends AbstractComponent {
} else { } else {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
} }
Executor executor = EsExecutors.newScaling(min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null));
} }
throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]");

View File

@ -80,7 +80,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1); int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1);
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize); logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX);
this.workers = EsExecutors.newFixed(workerCount, queueSize, threadFactory); this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory);
} }
@Override @Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.util.concurrent; package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.Test; import org.junit.Test;
@ -27,10 +28,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
/** /**
* Tests for EsExecutors and its components like EsAbortPolicy.
*/ */
public class EsExecutorsTests extends ESTestCase { public class EsExecutorsTests extends ESTestCase {
@ -38,9 +41,8 @@ public class EsExecutorsTests extends ESTestCase {
return TimeUnit.values()[between(0, TimeUnit.values().length - 1)]; return TimeUnit.values()[between(0, TimeUnit.values().length - 1)];
} }
@Test
public void testFixedForcedExecution() throws Exception { public void testFixedForcedExecution() throws Exception {
EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test"));
final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch wait = new CountDownLatch(1);
final CountDownLatch exec1Wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1);
@ -101,9 +103,8 @@ public class EsExecutorsTests extends ESTestCase {
executor.shutdownNow(); executor.shutdownNow();
} }
@Test
public void testFixedRejected() throws Exception { public void testFixedRejected() throws Exception {
EsThreadPoolExecutor executor = EsExecutors.newFixed(1, 1, EsExecutors.daemonThreadFactory("test")); EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), 1, 1, EsExecutors.daemonThreadFactory("test"));
final CountDownLatch wait = new CountDownLatch(1); final CountDownLatch wait = new CountDownLatch(1);
final CountDownLatch exec1Wait = new CountDownLatch(1); final CountDownLatch exec1Wait = new CountDownLatch(1);
@ -156,13 +157,12 @@ public class EsExecutorsTests extends ESTestCase {
terminate(executor); terminate(executor);
} }
@Test
public void testScaleUp() throws Exception { public void testScaleUp() throws Exception {
final int min = between(1, 3); final int min = between(1, 3);
final int max = between(min + 1, 6); final int max = between(min + 1, 6);
final ThreadBarrier barrier = new ThreadBarrier(max + 1); final ThreadBarrier barrier = new ThreadBarrier(max + 1);
ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test")); ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), randomTimeUnit(), EsExecutors.daemonThreadFactory("test"));
assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
@ -193,13 +193,12 @@ public class EsExecutorsTests extends ESTestCase {
terminate(pool); terminate(pool);
} }
@Test
public void testScaleDown() throws Exception { public void testScaleDown() throws Exception {
final int min = between(1, 3); final int min = between(1, 3);
final int max = between(min + 1, 6); final int max = between(min + 1, 6);
final ThreadBarrier barrier = new ThreadBarrier(max + 1); final ThreadBarrier barrier = new ThreadBarrier(max + 1);
final ThreadPoolExecutor pool = EsExecutors.newScaling(min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test")); final ThreadPoolExecutor pool = EsExecutors.newScaling(getTestName(), min, max, between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"));
assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
@ -236,4 +235,77 @@ public class EsExecutorsTests extends ESTestCase {
}); });
terminate(pool); terminate(pool);
} }
public void testRejectionMessageAndShuttingDownFlag() throws InterruptedException {
int pool = between(1, 10);
int queue = between(0, 100);
int actions = queue + pool;
final CountDownLatch latch = new CountDownLatch(1);
EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"));
try {
for (int i = 0; i < actions; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
try {
executor.execute(new Runnable() {
@Override
public void run() {
// Doesn't matter is going to be rejected
}
@Override
public String toString() {
return "dummy runnable";
}
});
fail("Didn't get a rejection when we expected one.");
} catch (EsRejectedExecutionException e) {
assertFalse("Thread pool registering as terminated when it isn't", e.isExecutorShutdown());
String message = ExceptionsHelper.detailedMessage(e);
assertThat(message, containsString("of dummy runnable"));
assertThat(message, containsString("on EsThreadPoolExecutor[testRejectionMessage"));
assertThat(message, containsString("queue capacity = " + queue));
assertThat(message, containsString("[Running"));
assertThat(message, containsString("active threads = " + pool));
assertThat(message, containsString("queued tasks = " + queue));
assertThat(message, containsString("completed tasks = 0"));
}
} finally {
latch.countDown();
terminate(executor);
}
try {
executor.execute(new Runnable() {
@Override
public void run() {
// Doesn't matter is going to be rejected
}
@Override
public String toString() {
return "dummy runnable";
}
});
fail("Didn't get a rejection when we expected one.");
} catch (EsRejectedExecutionException e) {
assertTrue("Thread pool not registering as terminated when it is", e.isExecutorShutdown());
String message = ExceptionsHelper.detailedMessage(e);
assertThat(message, containsString("of dummy runnable"));
assertThat(message, containsString("on EsThreadPoolExecutor[" + getTestName()));
assertThat(message, containsString("queue capacity = " + queue));
assertThat(message, containsString("[Terminated"));
assertThat(message, containsString("active threads = 0"));
assertThat(message, containsString("queued tasks = 0"));
assertThat(message, containsString("completed tasks = " + actions));
}
}
} }

View File

@ -61,7 +61,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
@Test @Test
public void testSubmitPrioritizedExecutorWithRunnables() throws Exception { public void testSubmitPrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8); List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8); CountDownLatch finishedLatch = new CountDownLatch(8);
@ -91,7 +91,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
@Test @Test
public void testExecutePrioritizedExecutorWithRunnables() throws Exception { public void testExecutePrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8); List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8); CountDownLatch finishedLatch = new CountDownLatch(8);
@ -121,7 +121,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
@Test @Test
public void testSubmitPrioritizedExecutorWithCallables() throws Exception { public void testSubmitPrioritizedExecutorWithCallables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8); List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8); CountDownLatch finishedLatch = new CountDownLatch(8);
@ -151,7 +151,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
@Test @Test
public void testSubmitPrioritizedExecutorWithMixed() throws Exception { public void testSubmitPrioritizedExecutorWithMixed() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); ExecutorService executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()));
List<Integer> results = new ArrayList<>(8); List<Integer> results = new ArrayList<>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1); CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(8); CountDownLatch finishedLatch = new CountDownLatch(8);
@ -182,7 +182,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
@Test @Test
public void testTimeout() throws Exception { public void testTimeout() throws Exception {
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName())); ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(EsExecutors.daemonThreadFactory(getTestName()));
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()));
final CountDownLatch invoked = new CountDownLatch(1); final CountDownLatch invoked = new CountDownLatch(1);
final CountDownLatch block = new CountDownLatch(1); final CountDownLatch block = new CountDownLatch(1);
executor.execute(new Runnable() { executor.execute(new Runnable() {
@ -246,7 +246,7 @@ public class PrioritizedExecutorsTests extends ESTestCase {
ThreadPool threadPool = new ThreadPool("test"); ThreadPool threadPool = new ThreadPool("test");
final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler(); final ScheduledThreadPoolExecutor timer = (ScheduledThreadPoolExecutor) threadPool.scheduler();
final AtomicBoolean timeoutCalled = new AtomicBoolean(); final AtomicBoolean timeoutCalled = new AtomicBoolean();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(EsExecutors.daemonThreadFactory(getTestName())); PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(getTestName(), EsExecutors.daemonThreadFactory(getTestName()));
final CountDownLatch invoked = new CountDownLatch(1); final CountDownLatch invoked = new CountDownLatch(1);
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override

View File

@ -537,7 +537,7 @@ public abstract class ESTestCase extends LuceneTestCase {
@Override @Override
public void uncaughtException(Thread t, Throwable e) { public void uncaughtException(Thread t, Throwable e) {
if (e instanceof EsRejectedExecutionException) { if (e instanceof EsRejectedExecutionException) {
if (e.getMessage() != null && e.getMessage().contains(EsAbortPolicy.SHUTTING_DOWN_KEY)) { if (e.getMessage() != null && ((EsRejectedExecutionException) e).isExecutorShutdown()) {
return; // ignore the EsRejectedExecutionException when a node shuts down return; // ignore the EsRejectedExecutionException when a node shuts down
} }
} else if (e instanceof OutOfMemoryError) { } else if (e instanceof OutOfMemoryError) {

View File

@ -314,7 +314,7 @@ public final class InternalTestCluster extends TestCluster {
// always reduce this - it can make tests really slow // always reduce this - it can make tests really slow
builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50))); builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50)));
defaultSettings = builder.build(); defaultSettings = builder.build();
executor = EsExecutors.newCached(0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName)); executor = EsExecutors.newCached("test runner", 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName));
} }
public static String nodeMode() { public static String nodeMode() {