[CORE] Add ThreadPool.terminate to streamline shutdown

Shutting down threadpools and executor services is done in very similar
fashion across the codebase. This commit streamlines the process by
adding a terminate method to ThreadPool.
This commit is contained in:
Simon Willnauer 2014-09-25 09:43:47 +02:00
parent 51bf3e6730
commit a236b80392
11 changed files with 70 additions and 56 deletions

View File

@ -279,16 +279,8 @@ public class TransportClient extends AbstractClient {
for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) { for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) {
injector.getInstance(plugin).close(); injector.getInstance(plugin).close();
} }
injector.getInstance(ThreadPool.class).shutdown();
try { try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS); ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
Thread.currentThread().interrupt();
}
try {
injector.getInstance(ThreadPool.class).shutdownNow();
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }

View File

@ -243,12 +243,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
public void close() { public void close() {
closed = true; closed = true;
if (executor != null) { ThreadPool.terminate(executor, 0, TimeUnit.SECONDS);
executor.shutdownNow();
executor = null; executor = null;
} }
} }
}
void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) { void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -95,13 +96,7 @@ public class RecoverySettings extends AbstractComponent {
} }
public void close() { public void close() {
concurrentStreamPool.shutdown(); ThreadPool.terminate(concurrentStreamPool, 1, TimeUnit.SECONDS);
try {
concurrentStreamPool.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// that's fine...
}
concurrentStreamPool.shutdownNow();
} }
public ByteSizeValue fileChunkSize() { public ByteSizeValue fileChunkSize() {

View File

@ -368,6 +368,7 @@ public final class InternalNode implements Node {
injector.getInstance(ScriptService.class).close(); injector.getInstance(ScriptService.class).close();
stopWatch.stop().start("thread_pool"); stopWatch.stop().start("thread_pool");
// TODO this should really use ThreadPool.terminate()
injector.getInstance(ThreadPool.class).shutdown(); injector.getInstance(ThreadPool.class).shutdown();
try { try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS); injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);

View File

@ -267,7 +267,8 @@ public class ThreadPool extends AbstractComponent {
} }
} }
while (!retiredExecutors.isEmpty()) { while (!retiredExecutors.isEmpty()) {
result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor()).awaitTermination(timeout, unit); ThreadPoolExecutor executor = (ThreadPoolExecutor) retiredExecutors.remove().executor();
result &= executor.awaitTermination(timeout, unit);
} }
estimatedTimeThread.join(unit.toMillis(timeout)); estimatedTimeThread.join(unit.toMillis(timeout));
return result; return result;
@ -704,4 +705,44 @@ public class ThreadPool extends AbstractComponent {
updateSettings(settings); updateSettings(settings);
} }
} }
/**
* Returns <code>true</code> if the given service was terminated successfully. If the termination timed out,
* the service is <code>null</code> this method will return <code>false</code>.
*/
public static boolean terminate(ExecutorService service, long timeout, TimeUnit timeUnit) {
if (service != null) {
service.shutdown();
try {
if (service.awaitTermination(timeout, timeUnit)) {
return true;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
service.shutdownNow();
}
return false;
}
/**
* Returns <code>true</code> if the given pool was terminated successfully. If the termination timed out,
* the service is <code>null</code> this method will return <code>false</code>.
*/
public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) {
if (pool != null) {
pool.shutdown();
try {
if (pool.awaitTermination(timeout, timeUnit)) {
return true;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// last resort
pool.shutdownNow();
}
return false;
}
} }

View File

@ -117,13 +117,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
@Override @Override
protected void doClose() throws ElasticsearchException { protected void doClose() throws ElasticsearchException {
workers.shutdown(); ThreadPool.terminate(workers, 10, TimeUnit.SECONDS);
try {
workers.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
workers.shutdownNow();
} }
@Override @Override

View File

@ -70,14 +70,14 @@ public class TransportClientNodesServiceTests extends ElasticsearchTestCase {
} }
public void close() { public void close() {
threadPool.shutdown();
try {
threadPool.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().isInterrupted();
}
transportService.stop(); transportService.stop();
transportClientNodesService.close(); transportClientNodesService.close();
try {
terminate(threadPool);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
} }
} }

View File

@ -153,7 +153,7 @@ public class EsExecutorsTests extends ElasticsearchTestCase {
assertThat(executed2.get(), equalTo(true)); assertThat(executed2.get(), equalTo(true));
assertThat(executed3.get(), equalTo(false)); assertThat(executed3.get(), equalTo(false));
executor.shutdownNow(); terminate(executor);
} }
@Test @Test
@ -189,7 +189,7 @@ public class EsExecutorsTests extends ElasticsearchTestCase {
assertThat("wrong pool size", pool.getPoolSize(), equalTo(max)); assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
assertThat("wrong active size", pool.getActiveCount(), equalTo(max)); assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
barrier.await(); barrier.await();
pool.shutdown(); terminate(pool);
} }
@Test @Test
@ -232,6 +232,6 @@ public class EsExecutorsTests extends ElasticsearchTestCase {
assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max)); assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
} }
}); });
pool.shutdown(); terminate(pool);
} }
} }

View File

@ -47,7 +47,7 @@ import static org.hamcrest.Matchers.equalTo;
public class UnicastZenPingTests extends ElasticsearchTestCase { public class UnicastZenPingTests extends ElasticsearchTestCase {
@Test @Test
public void testSimplePings() { public void testSimplePings() throws InterruptedException {
Settings settings = ImmutableSettings.EMPTY; Settings settings = ImmutableSettings.EMPTY;
int startPort = 11000 + randomIntBetween(0, 1000); int startPort = 11000 + randomIntBetween(0, 1000);
int endPort = startPort + 10; int endPort = startPort + 10;
@ -132,7 +132,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase {
zenPingB.close(); zenPingB.close();
transportServiceA.close(); transportServiceA.close();
transportServiceB.close(); transportServiceB.close();
threadPool.shutdown(); terminate(threadPool);
} }
} }
} }

View File

@ -530,20 +530,13 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
boolean terminated = true; boolean terminated = true;
for (ExecutorService service : services) { for (ExecutorService service : services) {
if (service != null) { if (service != null) {
service.shutdown(); terminated &= ThreadPool.terminate(service, 10, TimeUnit.SECONDS);
service.shutdownNow();
terminated &= service.awaitTermination(10, TimeUnit.SECONDS);
} }
} }
return terminated; return terminated;
} }
public static boolean terminate(ThreadPool service) throws InterruptedException { public static boolean terminate(ThreadPool service) throws InterruptedException {
if (service != null) { return ThreadPool.terminate(service, 10, TimeUnit.SECONDS);
service.shutdown();
service.shutdownNow();
return service.awaitTermination(10, TimeUnit.SECONDS);
}
return true;
} }
} }

View File

@ -49,7 +49,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
} }
@Test @Test
public void testCachedExecutorType() { public void testCachedExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool( ThreadPool threadPool = new ThreadPool(
ImmutableSettings.settingsBuilder() ImmutableSettings.settingsBuilder()
.put("threadpool.search.type", "cached") .put("threadpool.search.type", "cached")
@ -101,12 +101,11 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
// Make sure executor didn't change // Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
terminate(threadPool);
threadPool.shutdown();
} }
@Test @Test
public void testFixedExecutorType() { public void testFixedExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(settingsBuilder() ThreadPool threadPool = new ThreadPool(settingsBuilder()
.put("threadpool.search.type", "fixed") .put("threadpool.search.type", "fixed")
.put("name","testCachedExecutorType").build(), null); .put("name","testCachedExecutorType").build(), null);
@ -161,12 +160,12 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
.put("threadpool.search.queue", "500") .put("threadpool.search.queue", "500")
.build()); .build());
threadPool.shutdown(); terminate(threadPool);
} }
@Test @Test
public void testScalingExecutorType() { public void testScalingExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(settingsBuilder() ThreadPool threadPool = new ThreadPool(settingsBuilder()
.put("threadpool.search.type", "scaling") .put("threadpool.search.type", "scaling")
.put("threadpool.search.size", 10) .put("threadpool.search.size", 10)
@ -197,7 +196,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
threadPool.shutdown(); terminate(threadPool);
} }
@Test(timeout = 10000) @Test(timeout = 10000)
@ -224,8 +223,9 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false));
terminate(threadPool); threadPool.shutdownNow(); // interrupt the thread
latch.await(); latch.await();
terminate(threadPool);
} }
} }