From a236b803925e5d562091124a26d0ee009e757c33 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 25 Sep 2014 09:43:47 +0200 Subject: [PATCH] [CORE] Add ThreadPool.terminate to streamline shutdown Shutting down threadpools and executor services is done in very similar fashion across the codebase. This commit streamlines the process by adding a terminate method to ThreadPool. --- .../client/transport/TransportClient.java | 10 +---- .../zen/ping/unicast/UnicastZenPing.java | 6 +-- .../indices/recovery/RecoverySettings.java | 9 +--- .../node/internal/InternalNode.java | 1 + .../elasticsearch/threadpool/ThreadPool.java | 43 ++++++++++++++++++- .../transport/local/LocalTransport.java | 8 +--- .../TransportClientNodesServiceTests.java | 12 +++--- .../util/concurrent/EsExecutorsTests.java | 6 +-- .../zen/ping/unicast/UnicastZenPingTests.java | 4 +- .../test/ElasticsearchTestCase.java | 11 +---- .../UpdateThreadPoolSettingsTests.java | 16 +++---- 11 files changed, 70 insertions(+), 56 deletions(-) diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 457717e4643..99f59be6b5c 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -279,16 +279,8 @@ public class TransportClient extends AbstractClient { for (Class plugin : pluginsService.services()) { injector.getInstance(plugin).close(); } - - injector.getInstance(ThreadPool.class).shutdown(); try { - injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // ignore - Thread.currentThread().interrupt(); - } - try { - injector.getInstance(ThreadPool.class).shutdownNow(); + ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS); } catch (Exception e) { // ignore } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index fbad509b648..0d5d3ab2726 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -243,10 +243,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen public void close() { closed = true; - if (executor != null) { - executor.shutdownNow(); - executor = null; - } + ThreadPool.terminate(executor, 0, TimeUnit.SECONDS); + executor = null; } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 88890a8cdb4..6d59e9733b6 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -95,13 +96,7 @@ public class RecoverySettings extends AbstractComponent { } public void close() { - concurrentStreamPool.shutdown(); - try { - concurrentStreamPool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // that's fine... - } - concurrentStreamPool.shutdownNow(); + ThreadPool.terminate(concurrentStreamPool, 1, TimeUnit.SECONDS); } public ByteSizeValue fileChunkSize() { diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index aad42e86d25..6d0140cf1e6 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -368,6 +368,7 @@ public final class InternalNode implements Node { injector.getInstance(ScriptService.class).close(); stopWatch.stop().start("thread_pool"); + // TODO this should really use ThreadPool.terminate() injector.getInstance(ThreadPool.class).shutdown(); try { injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS); diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index c584cc25ef7..c85674add1f 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -267,7 +267,8 @@ public class ThreadPool extends AbstractComponent { } } while (!retiredExecutors.isEmpty()) { - result &= ((ThreadPoolExecutor) retiredExecutors.remove().executor()).awaitTermination(timeout, unit); + ThreadPoolExecutor executor = (ThreadPoolExecutor) retiredExecutors.remove().executor(); + result &= executor.awaitTermination(timeout, unit); } estimatedTimeThread.join(unit.toMillis(timeout)); return result; @@ -704,4 +705,44 @@ public class ThreadPool extends AbstractComponent { updateSettings(settings); } } + + /** + * Returns true if the given service was terminated successfully. If the termination timed out, + * the service is null this method will return false. + */ + public static boolean terminate(ExecutorService service, long timeout, TimeUnit timeUnit) { + if (service != null) { + service.shutdown(); + try { + if (service.awaitTermination(timeout, timeUnit)) { + return true; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + service.shutdownNow(); + } + return false; + } + + /** + * Returns true if the given pool was terminated successfully. If the termination timed out, + * the service is null this method will return false. + */ + public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) { + if (pool != null) { + pool.shutdown(); + try { + if (pool.awaitTermination(timeout, timeUnit)) { + return true; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // last resort + pool.shutdownNow(); + } + return false; + } + } diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 74b0d278bbe..1fc7641c499 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -117,13 +117,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem @Override protected void doClose() throws ElasticsearchException { - workers.shutdown(); - try { - workers.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - workers.shutdownNow(); + ThreadPool.terminate(workers, 10, TimeUnit.SECONDS); } @Override diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 359890c3f8f..6f6d848d216 100644 --- a/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -70,14 +70,14 @@ public class TransportClientNodesServiceTests extends ElasticsearchTestCase { } public void close() { - threadPool.shutdown(); - try { - threadPool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().isInterrupted(); - } + transportService.stop(); transportClientNodesService.close(); + try { + terminate(threadPool); + } catch (InterruptedException e) { + throw new AssertionError(e); + } } } diff --git a/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index e7a018bb783..360e89c3769 100644 --- a/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -153,7 +153,7 @@ public class EsExecutorsTests extends ElasticsearchTestCase { assertThat(executed2.get(), equalTo(true)); assertThat(executed3.get(), equalTo(false)); - executor.shutdownNow(); + terminate(executor); } @Test @@ -189,7 +189,7 @@ public class EsExecutorsTests extends ElasticsearchTestCase { assertThat("wrong pool size", pool.getPoolSize(), equalTo(max)); assertThat("wrong active size", pool.getActiveCount(), equalTo(max)); barrier.await(); - pool.shutdown(); + terminate(pool); } @Test @@ -232,6 +232,6 @@ public class EsExecutorsTests extends ElasticsearchTestCase { assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max)); } }); - pool.shutdown(); + terminate(pool); } } diff --git a/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java b/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java index cb80b52cdbd..7dd17c96572 100644 --- a/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -47,7 +47,7 @@ import static org.hamcrest.Matchers.equalTo; public class UnicastZenPingTests extends ElasticsearchTestCase { @Test - public void testSimplePings() { + public void testSimplePings() throws InterruptedException { Settings settings = ImmutableSettings.EMPTY; int startPort = 11000 + randomIntBetween(0, 1000); int endPort = startPort + 10; @@ -132,7 +132,7 @@ public class UnicastZenPingTests extends ElasticsearchTestCase { zenPingB.close(); transportServiceA.close(); transportServiceB.close(); - threadPool.shutdown(); + terminate(threadPool); } } } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java index cafee980b17..9ee26991474 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java @@ -530,20 +530,13 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest { boolean terminated = true; for (ExecutorService service : services) { if (service != null) { - service.shutdown(); - service.shutdownNow(); - terminated &= service.awaitTermination(10, TimeUnit.SECONDS); + terminated &= ThreadPool.terminate(service, 10, TimeUnit.SECONDS); } } return terminated; } public static boolean terminate(ThreadPool service) throws InterruptedException { - if (service != null) { - service.shutdown(); - service.shutdownNow(); - return service.awaitTermination(10, TimeUnit.SECONDS); - } - return true; + return ThreadPool.terminate(service, 10, TimeUnit.SECONDS); } } diff --git a/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index 76a1c4c2ca0..92ca587a5a6 100644 --- a/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -49,7 +49,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase { } @Test - public void testCachedExecutorType() { + public void testCachedExecutorType() throws InterruptedException { ThreadPool threadPool = new ThreadPool( ImmutableSettings.settingsBuilder() .put("threadpool.search.type", "cached") @@ -101,12 +101,11 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase { // Make sure executor didn't change assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - - threadPool.shutdown(); + terminate(threadPool); } @Test - public void testFixedExecutorType() { + public void testFixedExecutorType() throws InterruptedException { ThreadPool threadPool = new ThreadPool(settingsBuilder() .put("threadpool.search.type", "fixed") .put("name","testCachedExecutorType").build(), null); @@ -161,12 +160,12 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase { .put("threadpool.search.queue", "500") .build()); - threadPool.shutdown(); + terminate(threadPool); } @Test - public void testScalingExecutorType() { + public void testScalingExecutorType() throws InterruptedException { ThreadPool threadPool = new ThreadPool(settingsBuilder() .put("threadpool.search.type", "scaling") .put("threadpool.search.size", 10) @@ -197,7 +196,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase { assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - threadPool.shutdown(); + terminate(threadPool); } @Test(timeout = 10000) @@ -224,8 +223,9 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase { assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminated(), equalTo(false)); - terminate(threadPool); + threadPool.shutdownNow(); // interrupt the thread latch.await(); + terminate(threadPool); } }