Clean up Node#close. (#39317) (#41301)

`Node#close` is pretty hard to rely on today:
 - it might swallow exceptions
 - it waits for 10 seconds for threads to terminate but doesn't signal anything
   if threads are still not terminated after 10 seconds

This commit makes `IOException`s propagated and splits `Node#close` into
`Node#close` and `Node#awaitClose` so that the decision what to do if a node
takes too long to close can be done on top of `Node#close`.

It also adds synchronization to lifecycle transitions to make them atomic. I
don't think it is a source of problems today, but it makes things easier to
reason about.
This commit is contained in:
Adrien Grand 2019-04-17 16:10:53 +02:00 committed by GitHub
parent 1d2365f5b6
commit 9fd5237fd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 278 additions and 85 deletions

View File

@ -61,6 +61,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Internal startup code.
@ -185,8 +186,15 @@ final class Bootstrap {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("Node didn't stop within 10 seconds. " +
"Any outstanding requests or tasks might get killed.");
}
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
} catch (InterruptedException e) {
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
Thread.currentThread().interrupt();
}
}
});
@ -269,6 +277,12 @@ final class Bootstrap {
static void stop() throws IOException {
try {
IOUtils.close(INSTANCE.node, INSTANCE.spawner);
if (INSTANCE.node != null && INSTANCE.node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("Node didn't stop within 10 seconds. Any outstanding requests or tasks might get killed.");
}
} catch (InterruptedException e) {
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
Thread.currentThread().interrupt();
} finally {
INSTANCE.keepAliveLatch.countDown();
}

View File

@ -19,15 +19,12 @@
package org.elasticsearch.common.component;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public abstract class AbstractLifecycleComponent implements LifecycleComponent {
private static final Logger logger = LogManager.getLogger(AbstractLifecycleComponent.class);
protected final Lifecycle lifecycle = new Lifecycle();
@ -52,6 +49,7 @@ public abstract class AbstractLifecycleComponent implements LifecycleComponent {
@Override
public void start() {
synchronized (lifecycle) {
if (!lifecycle.canMoveToStarted()) {
return;
}
@ -64,11 +62,13 @@ public abstract class AbstractLifecycleComponent implements LifecycleComponent {
listener.afterStart();
}
}
}
protected abstract void doStart();
@Override
public void stop() {
synchronized (lifecycle) {
if (!lifecycle.canMoveToStopped()) {
return;
}
@ -81,11 +81,13 @@ public abstract class AbstractLifecycleComponent implements LifecycleComponent {
listener.afterStop();
}
}
}
protected abstract void doStop();
@Override
public void close() {
synchronized (lifecycle) {
if (lifecycle.started()) {
stop();
}
@ -99,14 +101,14 @@ public abstract class AbstractLifecycleComponent implements LifecycleComponent {
try {
doClose();
} catch (IOException e) {
// TODO: we need to separate out closing (ie shutting down) services, vs releasing runtime transient
// structures. Shutting down services should use IOUtils.close
logger.warn("failed to close " + getClass().getName(), e);
}
throw new UncheckedIOException(e);
} finally {
for (LifecycleListener listener : listeners) {
listener.afterClose();
}
}
}
}
protected abstract void doClose() throws IOException;
}

View File

@ -39,16 +39,23 @@ package org.elasticsearch.common.component;
* }
* </pre>
* <p>
* NOTE: The Lifecycle class is thread-safe. It is also possible to prevent concurrent state transitions
* by locking on the Lifecycle object itself. This is typically useful when chaining multiple transitions.
* <p>
* Note, closed is only allowed to be called when stopped, so make sure to stop the component first.
* Here is how the logic can be applied:
* Here is how the logic can be applied. A lock of the {@code lifecycleState} object is taken so that
* another thread cannot move the state from {@code STOPPED} to {@code STARTED} before it has moved to
* {@code CLOSED}.
* <pre>
* public void close() {
* synchronized (lifecycleState) {
* if (lifecycleState.started()) {
* stop();
* }
* if (!lifecycleState.moveToClosed()) {
* return;
* }
* }
* // perform close logic here
* }
* </pre>
@ -116,7 +123,7 @@ public class Lifecycle {
}
public boolean moveToStarted() throws IllegalStateException {
public synchronized boolean moveToStarted() throws IllegalStateException {
State localState = this.state;
if (localState == State.INITIALIZED || localState == State.STOPPED) {
state = State.STARTED;
@ -145,7 +152,7 @@ public class Lifecycle {
throw new IllegalStateException("Can't move to stopped with unknown state");
}
public boolean moveToStopped() throws IllegalStateException {
public synchronized boolean moveToStopped() throws IllegalStateException {
State localState = state;
if (localState == State.STARTED) {
state = State.STOPPED;
@ -171,7 +178,7 @@ public class Lifecycle {
return true;
}
public boolean moveToClosed() throws IllegalStateException {
public synchronized boolean moveToClosed() throws IllegalStateException {
State localState = state;
if (localState == State.CLOSED) {
return false;

View File

@ -131,7 +131,18 @@ public final class ThreadContext implements Closeable, Writeable {
public StoredContext stashContext() {
final ThreadContextStruct context = threadLocal.get();
threadLocal.set(null);
return () -> threadLocal.set(context);
return () -> {
// If the node and thus the threadLocal get closed while this task
// is still executing, we don't want this runnable to fail with an
// uncaught exception
try {
threadLocal.set(context);
} catch (IllegalStateException e) {
if (isClosed() == false) {
throw e;
}
}
};
}
/**

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader.CacheHelper;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
@ -201,6 +202,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);
@Override
protected void doStart() {
@ -274,6 +276,8 @@ public class IndicesService extends AbstractLifecycleComponent
indicesQueryCache);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
closeLatch.countDown();
}
}
};
@ -312,6 +316,18 @@ public class IndicesService extends AbstractLifecycleComponent
indicesRefCount.decRef();
}
/**
* Wait for this {@link IndicesService} to be effectively closed. When this returns {@code true}, all shards and shard stores
* are closed and all shard {@link CacheHelper#addClosedListener(org.apache.lucene.index.IndexReader.ClosedListener) closed
* listeners} have run. However some {@link IndexEventListener#onStoreClosed(ShardId) shard closed listeners} might not have
* run.
* @returns true if all shards closed within the given timeout, false otherwise
* @throws InterruptedException if the current thread got interrupted while waiting for shards to close
*/
public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
return closeLatch.await(timeout, timeUnit);
}
/**
* Returns the node stats indices stats. The {@code includePrevious} flag controls
* if old shards stats will be aggregated as well (only for relevant stats, such as

View File

@ -781,12 +781,14 @@ public class Node implements Closeable {
// In this case the process will be terminated even if the first call to close() has not finished yet.
@Override
public synchronized void close() throws IOException {
synchronized (lifecycle) {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.moveToClosed()) {
return;
}
}
logger.info("closing ...");
List<Closeable> toClose = new ArrayList<>();
@ -833,21 +835,12 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(ScriptService.class));
toClose.add(() -> stopWatch.stop().start("thread_pool"));
// TODO this should really use ThreadPool.terminate()
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
toClose.add(() -> {
try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
});
toClose.add(() -> stopWatch.stop().start("thread_pool_force_shutdown"));
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdownNow());
// Don't call shutdownNow here, it might break ongoing operations on Lucene indices.
// See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in
// awaitClose if the node doesn't finish closing within the specified time.
toClose.add(() -> stopWatch.stop());
toClose.add(injector.getInstance(NodeEnvironment.class));
toClose.add(injector.getInstance(PageCacheRecycler.class));
@ -858,6 +851,30 @@ public class Node implements Closeable {
logger.info("closed");
}
/**
* Wait for this node to be effectively closed.
*/
// synchronized to prevent running concurrently with close()
public synchronized boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
if (lifecycle.closed() == false) {
// We don't want to shutdown the threadpool or interrupt threads on a node that is not
// closed yet.
throw new IllegalStateException("Call close() first");
}
ThreadPool threadPool = injector.getInstance(ThreadPool.class);
final boolean terminated = ThreadPool.terminate(threadPool, timeout, timeUnit);
if (terminated) {
// All threads terminated successfully. Because search, recovery and all other operations
// that run on shards run in the threadpool, indices should be effectively closed by now.
if (nodeService.awaitClose(0, TimeUnit.MILLISECONDS) == false) {
throw new IllegalStateException("Some shards are still open after the threadpool terminated. " +
"Something is leaking index readers or store references.");
}
}
return terminated;
}
/**
* Returns {@code true} if the node is closed.

View File

@ -43,6 +43,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class NodeService implements Closeable {
private final Settings settings;
@ -135,4 +136,12 @@ public class NodeService implements Closeable {
IOUtils.close(indicesService);
}
/**
* Wait for the node to be effectively closed.
* @see IndicesService#awaitClose(long, TimeUnit)
*/
public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
return indicesService.awaitClose(timeout, timeUnit);
}
}

View File

@ -360,6 +360,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
closeLock.writeLock().lock();
try {
// No need for locking here since Lifecycle objects can't move from STARTED to INITIALIZED
if (lifecycle.initialized() == false && lifecycle.started() == false) {
throw new IllegalStateException("transport has been stopped");
}

View File

@ -136,9 +136,11 @@ final class TransportKeepAlive implements Closeable {
@Override
public void close() {
synchronized (lifecycle) {
lifecycle.moveToStopped();
lifecycle.moveToClosed();
}
}
private class ScheduledPing extends AbstractLifecycleRunnable {

View File

@ -49,6 +49,7 @@ import org.elasticsearch.transport.nio.MockNioTransportPlugin;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.Collections;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
@ -95,8 +96,10 @@ public class IndicesServiceCloseTests extends ESTestCase {
Node node = startNode();
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
assertEquals(1, indicesService.indicesRefCount.refCount());
assertFalse(indicesService.awaitClose(0, TimeUnit.MILLISECONDS));
node.close();
assertEquals(0, indicesService.indicesRefCount.refCount());
assertTrue(indicesService.awaitClose(0, TimeUnit.MILLISECONDS));
}
public void testCloseNonEmptyIndicesService() throws Exception {
@ -108,9 +111,11 @@ public class IndicesServiceCloseTests extends ESTestCase {
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));
assertEquals(2, indicesService.indicesRefCount.refCount());
assertFalse(indicesService.awaitClose(0, TimeUnit.MILLISECONDS));
node.close();
assertEquals(0, indicesService.indicesRefCount.refCount());
assertTrue(indicesService.awaitClose(0, TimeUnit.MILLISECONDS));
}
public void testCloseWithIncedRefStore() throws Exception {
@ -126,12 +131,15 @@ public class IndicesServiceCloseTests extends ESTestCase {
IndexService indexService = indicesService.iterator().next();
IndexShard shard = indexService.getShard(0);
shard.store().incRef();
assertFalse(indicesService.awaitClose(0, TimeUnit.MILLISECONDS));
node.close();
assertEquals(1, indicesService.indicesRefCount.refCount());
assertFalse(indicesService.awaitClose(0, TimeUnit.MILLISECONDS));
shard.store().decRef();
assertEquals(0, indicesService.indicesRefCount.refCount());
assertTrue(indicesService.awaitClose(0, TimeUnit.MILLISECONDS));
}
public void testCloseWhileOngoingRequest() throws Exception {

View File

@ -26,18 +26,30 @@ import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine.Searcher;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS")
public class NodeTests extends ESTestCase {
@ -136,5 +148,87 @@ public class NodeTests extends ESTestCase {
.put(Node.NODE_DATA_SETTING.getKey(), true);
}
public void testCloseOnOutstandingTask() throws Exception {
Node node = new MockNode(baseSettings().build(), basePlugins());
node.start();
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
AtomicBoolean shouldRun = new AtomicBoolean(true);
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
while (shouldRun.get());
});
node.close();
shouldRun.set(false);
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
}
public void testAwaitCloseTimeoutsOnNonInterruptibleTask() throws Exception {
Node node = new MockNode(baseSettings().build(), basePlugins());
node.start();
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
AtomicBoolean shouldRun = new AtomicBoolean(true);
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
while (shouldRun.get());
});
node.close();
assertFalse(node.awaitClose(0, TimeUnit.MILLISECONDS));
shouldRun.set(false);
}
public void testCloseOnInterruptibleTask() throws Exception {
Node node = new MockNode(baseSettings().build(), basePlugins());
node.start();
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch finishLatch = new CountDownLatch(1);
final AtomicBoolean interrupted = new AtomicBoolean(false);
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
try {
latch.await();
} catch (InterruptedException e) {
interrupted.set(true);
Thread.currentThread().interrupt();
} finally {
finishLatch.countDown();
}
});
node.close();
// close should not interrput ongoing tasks
assertFalse(interrupted.get());
// but awaitClose should
node.awaitClose(0, TimeUnit.SECONDS);
finishLatch.await();
assertTrue(interrupted.get());
}
public void testCloseOnLeakedIndexReaderReference() throws Exception {
Node node = new MockNode(baseSettings().build(), basePlugins());
node.start();
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
assertAcked(node.client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));
IndexService indexService = indicesService.iterator().next();
IndexShard shard = indexService.getShard(0);
Searcher searcher = shard.acquireSearcher("test");
node.close();
IllegalStateException e = expectThrows(IllegalStateException.class, () -> node.awaitClose(1, TimeUnit.DAYS));
searcher.close();
assertThat(e.getMessage(), Matchers.containsString("Something is leaking index readers or store references"));
}
public void testCloseOnLeakedStoreReference() throws Exception {
Node node = new MockNode(baseSettings().build(), basePlugins());
node.start();
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
assertAcked(node.client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));
IndexService indexService = indicesService.iterator().next();
IndexShard shard = indexService.getShard(0);
shard.store().incRef();
node.close();
IllegalStateException e = expectThrows(IllegalStateException.class, () -> node.awaitClose(1, TimeUnit.DAYS));
shard.store().decRef();
assertThat(e.getMessage(), Matchers.containsString("Something is leaking index readers or store references"));
}
}

View File

@ -652,7 +652,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
searchRequest.allowPartialSearchResults(randomBoolean());
ShardSearchTransportRequest request = new ShardSearchTransportRequest(OriginalIndices.NONE, searchRequest, shardId,
indexService.numberOfShards(), AliasFilter.EMPTY, 1f, nowInMillis, clusterAlias, Strings.EMPTY_ARRAY);
DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()));
try (DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()))) {
SearchShardTarget searchShardTarget = searchContext.shardTarget();
QueryShardContext queryShardContext = searchContext.getQueryShardContext();
String expectedIndexName = clusterAlias == null ? index : clusterAlias + ":" + index;
@ -664,4 +664,5 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
assertSame(searchShardTarget, searchContext.queryResult().getSearchShardTarget());
assertSame(searchShardTarget, searchContext.fetchResult().getSearchShardTarget());
}
}
}

View File

@ -60,6 +60,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
@ -99,10 +100,13 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
).get();
}
private static void stopNode() throws IOException {
private static void stopNode() throws IOException, InterruptedException {
Node node = NODE;
NODE = null;
IOUtils.close(node);
if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new AssertionError("Node couldn't close within 10 seconds.");
}
}
@Override
@ -144,7 +148,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
}
@AfterClass
public static void tearDownClass() throws IOException {
public static void tearDownClass() throws Exception {
stopNode();
}

View File

@ -1049,6 +1049,13 @@ public final class InternalTestCluster extends TestCluster {
closed.set(true);
markNodeDataDirsAsPendingForWipe(node);
node.close();
try {
if (node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new IOException("Node didn't close within 10 seconds.");
}
} catch (InterruptedException e) {
throw new AssertionError("Interruption while waiting for the node to close", e);
}
}
}