diff --git a/server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java b/server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java index f9d6090f3f6..e1ca9d7d2f9 100644 --- a/server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java +++ b/server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java @@ -61,6 +61,7 @@ import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * Internal startup code. @@ -185,8 +186,15 @@ final class Bootstrap { IOUtils.close(node, spawner); LoggerContext context = (LoggerContext) LogManager.getContext(false); Configurator.shutdown(context); + if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) { + throw new IllegalStateException("Node didn't stop within 10 seconds. " + + "Any outstanding requests or tasks might get killed."); + } } catch (IOException ex) { throw new ElasticsearchException("failed to stop node", ex); + } catch (InterruptedException e) { + LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown."); + Thread.currentThread().interrupt(); } } }); @@ -269,6 +277,12 @@ final class Bootstrap { static void stop() throws IOException { try { IOUtils.close(INSTANCE.node, INSTANCE.spawner); + if (INSTANCE.node != null && INSTANCE.node.awaitClose(10, TimeUnit.SECONDS) == false) { + throw new IllegalStateException("Node didn't stop within 10 seconds. Any outstanding requests or tasks might get killed."); + } + } catch (InterruptedException e) { + LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown."); + Thread.currentThread().interrupt(); } finally { INSTANCE.keepAliveLatch.countDown(); } diff --git a/server/src/main/java/org/elasticsearch/common/component/AbstractLifecycleComponent.java b/server/src/main/java/org/elasticsearch/common/component/AbstractLifecycleComponent.java index 772d2d89cf5..a7f72c63091 100644 --- a/server/src/main/java/org/elasticsearch/common/component/AbstractLifecycleComponent.java +++ b/server/src/main/java/org/elasticsearch/common/component/AbstractLifecycleComponent.java @@ -19,15 +19,12 @@ package org.elasticsearch.common.component; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; public abstract class AbstractLifecycleComponent implements LifecycleComponent { - private static final Logger logger = LogManager.getLogger(AbstractLifecycleComponent.class); protected final Lifecycle lifecycle = new Lifecycle(); @@ -52,16 +49,18 @@ public abstract class AbstractLifecycleComponent implements LifecycleComponent { @Override public void start() { - if (!lifecycle.canMoveToStarted()) { - return; - } - for (LifecycleListener listener : listeners) { - listener.beforeStart(); - } - doStart(); - lifecycle.moveToStarted(); - for (LifecycleListener listener : listeners) { - listener.afterStart(); + synchronized (lifecycle) { + if (!lifecycle.canMoveToStarted()) { + return; + } + for (LifecycleListener listener : listeners) { + listener.beforeStart(); + } + doStart(); + lifecycle.moveToStarted(); + for (LifecycleListener listener : listeners) { + listener.afterStart(); + } } } @@ -69,16 +68,18 @@ public abstract class AbstractLifecycleComponent implements LifecycleComponent { @Override public void stop() { - if (!lifecycle.canMoveToStopped()) { - return; - } - for (LifecycleListener listener : listeners) { - listener.beforeStop(); - } - lifecycle.moveToStopped(); - doStop(); - for (LifecycleListener listener : listeners) { - listener.afterStop(); + synchronized (lifecycle) { + if (!lifecycle.canMoveToStopped()) { + return; + } + for (LifecycleListener listener : listeners) { + listener.beforeStop(); + } + lifecycle.moveToStopped(); + doStop(); + for (LifecycleListener listener : listeners) { + listener.afterStop(); + } } } @@ -86,25 +87,26 @@ public abstract class AbstractLifecycleComponent implements LifecycleComponent { @Override public void close() { - if (lifecycle.started()) { - stop(); - } - if (!lifecycle.canMoveToClosed()) { - return; - } - for (LifecycleListener listener : listeners) { - listener.beforeClose(); - } - lifecycle.moveToClosed(); - try { - doClose(); - } catch (IOException e) { - // TODO: we need to separate out closing (ie shutting down) services, vs releasing runtime transient - // structures. Shutting down services should use IOUtils.close - logger.warn("failed to close " + getClass().getName(), e); - } - for (LifecycleListener listener : listeners) { - listener.afterClose(); + synchronized (lifecycle) { + if (lifecycle.started()) { + stop(); + } + if (!lifecycle.canMoveToClosed()) { + return; + } + for (LifecycleListener listener : listeners) { + listener.beforeClose(); + } + lifecycle.moveToClosed(); + try { + doClose(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + for (LifecycleListener listener : listeners) { + listener.afterClose(); + } + } } } diff --git a/server/src/main/java/org/elasticsearch/common/component/Lifecycle.java b/server/src/main/java/org/elasticsearch/common/component/Lifecycle.java index e71c9b03899..82042ab2b7d 100644 --- a/server/src/main/java/org/elasticsearch/common/component/Lifecycle.java +++ b/server/src/main/java/org/elasticsearch/common/component/Lifecycle.java @@ -39,15 +39,22 @@ package org.elasticsearch.common.component; * } * *
+ * NOTE: The Lifecycle class is thread-safe. It is also possible to prevent concurrent state transitions + * by locking on the Lifecycle object itself. This is typically useful when chaining multiple transitions. + *
* Note, closed is only allowed to be called when stopped, so make sure to stop the component first. - * Here is how the logic can be applied: + * Here is how the logic can be applied. A lock of the {@code lifecycleState} object is taken so that + * another thread cannot move the state from {@code STOPPED} to {@code STARTED} before it has moved to + * {@code CLOSED}. *
* public void close() { - * if (lifecycleState.started()) { - * stop(); - * } - * if (!lifecycleState.moveToClosed()) { - * return; + * synchronized (lifecycleState) { + * if (lifecycleState.started()) { + * stop(); + * } + * if (!lifecycleState.moveToClosed()) { + * return; + * } * } * // perform close logic here * } @@ -116,7 +123,7 @@ public class Lifecycle { } - public boolean moveToStarted() throws IllegalStateException { + public synchronized boolean moveToStarted() throws IllegalStateException { State localState = this.state; if (localState == State.INITIALIZED || localState == State.STOPPED) { state = State.STARTED; @@ -145,7 +152,7 @@ public class Lifecycle { throw new IllegalStateException("Can't move to stopped with unknown state"); } - public boolean moveToStopped() throws IllegalStateException { + public synchronized boolean moveToStopped() throws IllegalStateException { State localState = state; if (localState == State.STARTED) { state = State.STOPPED; @@ -171,7 +178,7 @@ public class Lifecycle { return true; } - public boolean moveToClosed() throws IllegalStateException { + public synchronized boolean moveToClosed() throws IllegalStateException { State localState = state; if (localState == State.CLOSED) { return false; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 01e23843777..80a9a30032e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -131,7 +131,18 @@ public final class ThreadContext implements Closeable, Writeable { public StoredContext stashContext() { final ThreadContextStruct context = threadLocal.get(); threadLocal.set(null); - return () -> threadLocal.set(context); + return () -> { + // If the node and thus the threadLocal get closed while this task + // is still executing, we don't want this runnable to fail with an + // uncaught exception + try { + threadLocal.set(context); + } catch (IllegalStateException e) { + if (isClosed() == false) { + throw e; + } + } + }; } /** diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 82570e32947..e07a83e2031 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader.CacheHelper; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.CollectionUtil; @@ -201,6 +202,7 @@ public class IndicesService extends AbstractLifecycleComponent private final Collection>> engineFactoryProviders; private final Map > indexStoreFactories; final AbstractRefCounted indicesRefCount; // pkg-private for testing + private final CountDownLatch closeLatch = new CountDownLatch(1); @Override protected void doStart() { @@ -274,6 +276,8 @@ public class IndicesService extends AbstractLifecycleComponent indicesQueryCache); } catch (IOException e) { throw new UncheckedIOException(e); + } finally { + closeLatch.countDown(); } } }; @@ -312,6 +316,18 @@ public class IndicesService extends AbstractLifecycleComponent indicesRefCount.decRef(); } + /** + * Wait for this {@link IndicesService} to be effectively closed. When this returns {@code true}, all shards and shard stores + * are closed and all shard {@link CacheHelper#addClosedListener(org.apache.lucene.index.IndexReader.ClosedListener) closed + * listeners} have run. However some {@link IndexEventListener#onStoreClosed(ShardId) shard closed listeners} might not have + * run. + * @returns true if all shards closed within the given timeout, false otherwise + * @throws InterruptedException if the current thread got interrupted while waiting for shards to close + */ + public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException { + return closeLatch.await(timeout, timeUnit); + } + /** * Returns the node stats indices stats. The {@code includePrevious} flag controls * if old shards stats will be aggregated as well (only for relevant stats, such as diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 8484be006ec..42f80dbd87c 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -781,11 +781,13 @@ public class Node implements Closeable { // In this case the process will be terminated even if the first call to close() has not finished yet. @Override public synchronized void close() throws IOException { - if (lifecycle.started()) { - stop(); - } - if (!lifecycle.moveToClosed()) { - return; + synchronized (lifecycle) { + if (lifecycle.started()) { + stop(); + } + if (!lifecycle.moveToClosed()) { + return; + } } logger.info("closing ..."); @@ -833,21 +835,12 @@ public class Node implements Closeable { toClose.add(injector.getInstance(ScriptService.class)); toClose.add(() -> stopWatch.stop().start("thread_pool")); - // TODO this should really use ThreadPool.terminate() toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown()); - toClose.add(() -> { - try { - injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // ignore - } - }); - - toClose.add(() -> stopWatch.stop().start("thread_pool_force_shutdown")); - toClose.add(() -> injector.getInstance(ThreadPool.class).shutdownNow()); + // Don't call shutdownNow here, it might break ongoing operations on Lucene indices. + // See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in + // awaitClose if the node doesn't finish closing within the specified time. toClose.add(() -> stopWatch.stop()); - toClose.add(injector.getInstance(NodeEnvironment.class)); toClose.add(injector.getInstance(PageCacheRecycler.class)); @@ -858,6 +851,30 @@ public class Node implements Closeable { logger.info("closed"); } + /** + * Wait for this node to be effectively closed. + */ + // synchronized to prevent running concurrently with close() + public synchronized boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException { + if (lifecycle.closed() == false) { + // We don't want to shutdown the threadpool or interrupt threads on a node that is not + // closed yet. + throw new IllegalStateException("Call close() first"); + } + + + ThreadPool threadPool = injector.getInstance(ThreadPool.class); + final boolean terminated = ThreadPool.terminate(threadPool, timeout, timeUnit); + if (terminated) { + // All threads terminated successfully. Because search, recovery and all other operations + // that run on shards run in the threadpool, indices should be effectively closed by now. + if (nodeService.awaitClose(0, TimeUnit.MILLISECONDS) == false) { + throw new IllegalStateException("Some shards are still open after the threadpool terminated. " + + "Something is leaking index readers or store references."); + } + } + return terminated; + } /** * Returns {@code true} if the node is closed. diff --git a/server/src/main/java/org/elasticsearch/node/NodeService.java b/server/src/main/java/org/elasticsearch/node/NodeService.java index 0567641b8a5..3f71a21966c 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeService.java +++ b/server/src/main/java/org/elasticsearch/node/NodeService.java @@ -43,6 +43,7 @@ import org.elasticsearch.transport.TransportService; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.TimeUnit; public class NodeService implements Closeable { private final Settings settings; @@ -135,4 +136,12 @@ public class NodeService implements Closeable { IOUtils.close(indicesService); } + /** + * Wait for the node to be effectively closed. + * @see IndicesService#awaitClose(long, TimeUnit) + */ + public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException { + return indicesService.awaitClose(timeout, timeUnit); + } + } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index eb61af8d2a3..42d61301635 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -360,6 +360,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements final AtomicReference boundSocket = new AtomicReference<>(); closeLock.writeLock().lock(); try { + // No need for locking here since Lifecycle objects can't move from STARTED to INITIALIZED if (lifecycle.initialized() == false && lifecycle.started() == false) { throw new IllegalStateException("transport has been stopped"); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java index 571ced1c118..fc7ebe4b964 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java @@ -136,8 +136,10 @@ final class TransportKeepAlive implements Closeable { @Override public void close() { - lifecycle.moveToStopped(); - lifecycle.moveToClosed(); + synchronized (lifecycle) { + lifecycle.moveToStopped(); + lifecycle.moveToClosed(); + } } private class ScheduledPing extends AbstractLifecycleRunnable { diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java index 15b45330530..e22253be7fc 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.transport.nio.MockNioTransportPlugin; import java.nio.file.Path; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import java.util.Collections; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; @@ -95,8 +96,10 @@ public class IndicesServiceCloseTests extends ESTestCase { Node node = startNode(); IndicesService indicesService = node.injector().getInstance(IndicesService.class); assertEquals(1, indicesService.indicesRefCount.refCount()); + assertFalse(indicesService.awaitClose(0, TimeUnit.MILLISECONDS)); node.close(); assertEquals(0, indicesService.indicesRefCount.refCount()); + assertTrue(indicesService.awaitClose(0, TimeUnit.MILLISECONDS)); } public void testCloseNonEmptyIndicesService() throws Exception { @@ -108,9 +111,11 @@ public class IndicesServiceCloseTests extends ESTestCase { .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))); assertEquals(2, indicesService.indicesRefCount.refCount()); + assertFalse(indicesService.awaitClose(0, TimeUnit.MILLISECONDS)); node.close(); assertEquals(0, indicesService.indicesRefCount.refCount()); + assertTrue(indicesService.awaitClose(0, TimeUnit.MILLISECONDS)); } public void testCloseWithIncedRefStore() throws Exception { @@ -126,12 +131,15 @@ public class IndicesServiceCloseTests extends ESTestCase { IndexService indexService = indicesService.iterator().next(); IndexShard shard = indexService.getShard(0); shard.store().incRef(); + assertFalse(indicesService.awaitClose(0, TimeUnit.MILLISECONDS)); node.close(); assertEquals(1, indicesService.indicesRefCount.refCount()); + assertFalse(indicesService.awaitClose(0, TimeUnit.MILLISECONDS)); shard.store().decRef(); assertEquals(0, indicesService.indicesRefCount.refCount()); + assertTrue(indicesService.awaitClose(0, TimeUnit.MILLISECONDS)); } public void testCloseWhileOngoingRequest() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/node/NodeTests.java b/server/src/test/java/org/elasticsearch/node/NodeTests.java index 288817d5c77..6f0419421b8 100644 --- a/server/src/test/java/org/elasticsearch/node/NodeTests.java +++ b/server/src/test/java/org/elasticsearch/node/NodeTests.java @@ -26,18 +26,30 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine.Searcher; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockHttpTransport; +import org.elasticsearch.threadpool.ThreadPool; +import org.hamcrest.Matchers; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") public class NodeTests extends ESTestCase { @@ -136,5 +148,87 @@ public class NodeTests extends ESTestCase { .put(Node.NODE_DATA_SETTING.getKey(), true); } + public void testCloseOnOutstandingTask() throws Exception { + Node node = new MockNode(baseSettings().build(), basePlugins()); + node.start(); + ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); + AtomicBoolean shouldRun = new AtomicBoolean(true); + threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + while (shouldRun.get()); + }); + node.close(); + shouldRun.set(false); + assertTrue(node.awaitClose(1, TimeUnit.DAYS)); + } + public void testAwaitCloseTimeoutsOnNonInterruptibleTask() throws Exception { + Node node = new MockNode(baseSettings().build(), basePlugins()); + node.start(); + ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); + AtomicBoolean shouldRun = new AtomicBoolean(true); + threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + while (shouldRun.get()); + }); + node.close(); + assertFalse(node.awaitClose(0, TimeUnit.MILLISECONDS)); + shouldRun.set(false); + } + + public void testCloseOnInterruptibleTask() throws Exception { + Node node = new MockNode(baseSettings().build(), basePlugins()); + node.start(); + ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); + CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch finishLatch = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(false); + threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + interrupted.set(true); + Thread.currentThread().interrupt(); + } finally { + finishLatch.countDown(); + } + }); + node.close(); + // close should not interrput ongoing tasks + assertFalse(interrupted.get()); + // but awaitClose should + node.awaitClose(0, TimeUnit.SECONDS); + finishLatch.await(); + assertTrue(interrupted.get()); + } + + public void testCloseOnLeakedIndexReaderReference() throws Exception { + Node node = new MockNode(baseSettings().build(), basePlugins()); + node.start(); + IndicesService indicesService = node.injector().getInstance(IndicesService.class); + assertAcked(node.client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))); + IndexService indexService = indicesService.iterator().next(); + IndexShard shard = indexService.getShard(0); + Searcher searcher = shard.acquireSearcher("test"); + node.close(); + + IllegalStateException e = expectThrows(IllegalStateException.class, () -> node.awaitClose(1, TimeUnit.DAYS)); + searcher.close(); + assertThat(e.getMessage(), Matchers.containsString("Something is leaking index readers or store references")); + } + + public void testCloseOnLeakedStoreReference() throws Exception { + Node node = new MockNode(baseSettings().build(), basePlugins()); + node.start(); + IndicesService indicesService = node.injector().getInstance(IndicesService.class); + assertAcked(node.client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))); + IndexService indexService = indicesService.iterator().next(); + IndexShard shard = indexService.getShard(0); + shard.store().incRef(); + node.close(); + + IllegalStateException e = expectThrows(IllegalStateException.class, () -> node.awaitClose(1, TimeUnit.DAYS)); + shard.store().decRef(); + assertThat(e.getMessage(), Matchers.containsString("Something is leaking index readers or store references")); + } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 641f1a1c19b..90957c2779e 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -652,16 +652,17 @@ public class SearchServiceTests extends ESSingleNodeTestCase { searchRequest.allowPartialSearchResults(randomBoolean()); ShardSearchTransportRequest request = new ShardSearchTransportRequest(OriginalIndices.NONE, searchRequest, shardId, indexService.numberOfShards(), AliasFilter.EMPTY, 1f, nowInMillis, clusterAlias, Strings.EMPTY_ARRAY); - DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis())); - SearchShardTarget searchShardTarget = searchContext.shardTarget(); - QueryShardContext queryShardContext = searchContext.getQueryShardContext(); - String expectedIndexName = clusterAlias == null ? index : clusterAlias + ":" + index; - assertEquals(expectedIndexName, queryShardContext.getFullyQualifiedIndex().getName()); - assertEquals(expectedIndexName, searchShardTarget.getFullyQualifiedIndexName()); - assertEquals(clusterAlias, searchShardTarget.getClusterAlias()); - assertEquals(shardId, searchShardTarget.getShardId()); - assertSame(searchShardTarget, searchContext.dfsResult().getSearchShardTarget()); - assertSame(searchShardTarget, searchContext.queryResult().getSearchShardTarget()); - assertSame(searchShardTarget, searchContext.fetchResult().getSearchShardTarget()); + try (DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()))) { + SearchShardTarget searchShardTarget = searchContext.shardTarget(); + QueryShardContext queryShardContext = searchContext.getQueryShardContext(); + String expectedIndexName = clusterAlias == null ? index : clusterAlias + ":" + index; + assertEquals(expectedIndexName, queryShardContext.getFullyQualifiedIndex().getName()); + assertEquals(expectedIndexName, searchShardTarget.getFullyQualifiedIndexName()); + assertEquals(clusterAlias, searchShardTarget.getClusterAlias()); + assertEquals(shardId, searchShardTarget.getShardId()); + assertSame(searchShardTarget, searchContext.dfsResult().getSearchShardTarget()); + assertSame(searchShardTarget, searchContext.queryResult().getSearchShardTarget()); + assertSame(searchShardTarget, searchContext.fetchResult().getSearchShardTarget()); + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 4d8f9fed51b..621f303c983 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -60,6 +60,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; @@ -99,10 +100,13 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { ).get(); } - private static void stopNode() throws IOException { + private static void stopNode() throws IOException, InterruptedException { Node node = NODE; NODE = null; IOUtils.close(node); + if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) { + throw new AssertionError("Node couldn't close within 10 seconds."); + } } @Override @@ -144,7 +148,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { } @AfterClass - public static void tearDownClass() throws IOException { + public static void tearDownClass() throws Exception { stopNode(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 4d531d57cef..c41a0fdcbef 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1049,6 +1049,13 @@ public final class InternalTestCluster extends TestCluster { closed.set(true); markNodeDataDirsAsPendingForWipe(node); node.close(); + try { + if (node.awaitClose(10, TimeUnit.SECONDS) == false) { + throw new IOException("Node didn't close within 10 seconds."); + } + } catch (InterruptedException e) { + throw new AssertionError("Interruption while waiting for the node to close", e); + } } }