Ensure all resoruces are closed on Node#close()

We are leaking all kinds of resources if something during Node#close() barfs.
This commit cuts over to a list of closeables to release resources that
also closed remaining services if one or more services fail to close.

Closes #13685
This commit is contained in:
Simon Willnauer 2016-01-29 16:18:21 +01:00
parent 838edab168
commit 87737d3e02
10 changed files with 105 additions and 86 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.bootstrap;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.StringHelper;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.PidFile; import org.elasticsearch.common.PidFile;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
@ -40,6 +41,7 @@ import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.node.internal.InternalSettingsPreparer;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Locale; import java.util.Locale;
@ -114,7 +116,11 @@ final class Bootstrap {
public boolean handle(int code) { public boolean handle(int code) {
if (CTRL_CLOSE_EVENT == code) { if (CTRL_CLOSE_EVENT == code) {
logger.info("running graceful exit on windows"); logger.info("running graceful exit on windows");
try {
Bootstrap.stop(); Bootstrap.stop();
} catch (IOException e) {
throw new ElasticsearchException("failed to stop node", e);
}
return true; return true;
} }
return false; return false;
@ -154,7 +160,11 @@ final class Bootstrap {
@Override @Override
public void run() { public void run() {
if (node != null) { if (node != null) {
try {
node.close(); node.close();
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
}
} }
} }
}); });
@ -221,9 +231,9 @@ final class Bootstrap {
keepAliveThread.start(); keepAliveThread.start();
} }
static void stop() { static void stop() throws IOException {
try { try {
Releasables.close(INSTANCE.node); INSTANCE.node.close();
} finally { } finally {
INSTANCE.keepAliveLatch.countDown(); INSTANCE.keepAliveLatch.countDown();
} }

View File

@ -19,6 +19,8 @@
package org.elasticsearch.bootstrap; package org.elasticsearch.bootstrap;
import java.io.IOException;
/** /**
* This class starts elasticsearch. * This class starts elasticsearch.
*/ */
@ -48,7 +50,7 @@ public final class Elasticsearch {
* *
* NOTE: If this method is renamed and/or moved, make sure to update service.bat! * NOTE: If this method is renamed and/or moved, make sure to update service.bat!
*/ */
static void close(String[] args) { static void close(String[] args) throws IOException {
Bootstrap.stop(); Bootstrap.stop();
} }
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cache.recycler;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.recycler.AbstractRecyclerC; import org.elasticsearch.common.recycler.AbstractRecyclerC;
import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -38,7 +39,7 @@ import static org.elasticsearch.common.recycler.Recyclers.dequeFactory;
import static org.elasticsearch.common.recycler.Recyclers.none; import static org.elasticsearch.common.recycler.Recyclers.none;
/** A recycler of fixed-size pages. */ /** A recycler of fixed-size pages. */
public class PageCacheRecycler extends AbstractComponent { public class PageCacheRecycler extends AbstractComponent implements Releasable {
public static final String TYPE = "recycler.page.type"; public static final String TYPE = "recycler.page.type";
public static final String LIMIT_HEAP = "recycler.page.limit.heap"; public static final String LIMIT_HEAP = "recycler.page.limit.heap";
@ -49,6 +50,7 @@ public class PageCacheRecycler extends AbstractComponent {
private final Recycler<long[]> longPage; private final Recycler<long[]> longPage;
private final Recycler<Object[]> objectPage; private final Recycler<Object[]> objectPage;
@Override
public void close() { public void close() {
bytePage.close(); bytePage.close();
intPage.close(); intPage.close();

View File

@ -21,10 +21,12 @@ package org.elasticsearch.common.lease;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import java.io.Closeable;
/** /**
* Specialization of {@link AutoCloseable} that may only throw an {@link ElasticsearchException}. * Specialization of {@link AutoCloseable} that may only throw an {@link ElasticsearchException}.
*/ */
public interface Releasable extends AutoCloseable { public interface Releasable extends Closeable {
@Override @Override
void close(); void close();

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.cache.RemovalListener;
import org.elasticsearch.common.cache.RemovalNotification; import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
@ -52,7 +53,7 @@ import java.util.function.ToLongBiFunction;
/** /**
*/ */
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, Accountable> { public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, Accountable>, Releasable{
public static final Setting<TimeValue> INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER); public static final Setting<TimeValue> INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> INDICES_FIELDDATA_CACHE_SIZE_KEY = Setting.byteSizeSetting("indices.fielddata.cache.size", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER); public static final Setting<ByteSizeValue> INDICES_FIELDDATA_CACHE_SIZE_KEY = Setting.byteSizeSetting("indices.fielddata.cache.size", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER);
@ -84,6 +85,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
new FieldDataCacheCleaner(this.cache, this.logger, this.threadPool, this.cleanInterval)); new FieldDataCacheCleaner(this.cache, this.logger, this.threadPool, this.cleanInterval));
} }
@Override
public void close() { public void close() {
cache.invalidateAll(); cache.invalidateAll();
this.closed = true; this.closed = true;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.node; package org.elasticsearch.node;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Build; import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
@ -100,6 +101,7 @@ import org.elasticsearch.watcher.ResourceWatcherModule;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.Inet6Address; import java.net.Inet6Address;
import java.net.InetAddress; import java.net.InetAddress;
@ -108,9 +110,11 @@ import java.nio.charset.Charset;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
@ -120,7 +124,7 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder;
* A node represent a node within a cluster (<tt>cluster.name</tt>). The {@link #client()} can be used * A node represent a node within a cluster (<tt>cluster.name</tt>). The {@link #client()} can be used
* in order to use a {@link Client} to perform actions/operations against the cluster. * in order to use a {@link Client} to perform actions/operations against the cluster.
*/ */
public class Node implements Releasable { public class Node implements Closeable {
public static final Setting<Boolean> WRITE_PORTS_FIELD_SETTING = Setting.boolSetting("node.portsfile", false, false, Setting.Scope.CLUSTER); public static final Setting<Boolean> WRITE_PORTS_FIELD_SETTING = Setting.boolSetting("node.portsfile", false, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> NODE_CLIENT_SETTING = Setting.boolSetting("node.client", false, false, Setting.Scope.CLUSTER); public static final Setting<Boolean> NODE_CLIENT_SETTING = Setting.boolSetting("node.client", false, false, Setting.Scope.CLUSTER);
@ -351,7 +355,7 @@ public class Node implements Releasable {
// If not, the hook that is added in Bootstrap#setup() will be useless: close() might not be executed, in case another (for example api) call // If not, the hook that is added in Bootstrap#setup() will be useless: close() might not be executed, in case another (for example api) call
// to close() has already set some lifecycles to stopped. In this case the process will be terminated even if the first call to close() has not finished yet. // to close() has already set some lifecycles to stopped. In this case the process will be terminated even if the first call to close() has not finished yet.
@Override @Override
public synchronized void close() { public synchronized void close() throws IOException {
if (lifecycle.started()) { if (lifecycle.started()) {
stop(); stop();
} }
@ -361,88 +365,80 @@ public class Node implements Releasable {
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name")); ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
logger.info("closing ..."); logger.info("closing ...");
List<Closeable> toClose = new ArrayList<>();
StopWatch stopWatch = new StopWatch("node_close"); StopWatch stopWatch = new StopWatch("node_close");
stopWatch.start("tribe"); toClose.add(() -> stopWatch.start("tribe"));
injector.getInstance(TribeService.class).close(); toClose.add(injector.getInstance(TribeService.class));
stopWatch.stop().start("node_service"); toClose.add(() -> stopWatch.stop().start("node_service"));
try { toClose.add(injector.getInstance(NodeService.class));
injector.getInstance(NodeService.class).close(); toClose.add(() ->stopWatch.stop().start("http"));
} catch (IOException e) {
logger.warn("NodeService close failed", e);
}
stopWatch.stop().start("http");
if (settings.getAsBoolean("http.enabled", true)) { if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).close(); toClose.add(injector.getInstance(HttpServer.class));
} }
stopWatch.stop().start("snapshot_service"); toClose.add(() ->stopWatch.stop().start("snapshot_service"));
injector.getInstance(SnapshotsService.class).close(); toClose.add(injector.getInstance(SnapshotsService.class));
injector.getInstance(SnapshotShardsService.class).close(); toClose.add(injector.getInstance(SnapshotShardsService.class));
stopWatch.stop().start("client"); toClose.add(() ->stopWatch.stop().start("client"));
Releasables.close(injector.getInstance(Client.class)); Releasables.close(injector.getInstance(Client.class));
stopWatch.stop().start("indices_cluster"); toClose.add(() ->stopWatch.stop().start("indices_cluster"));
injector.getInstance(IndicesClusterStateService.class).close(); toClose.add(injector.getInstance(IndicesClusterStateService.class));
stopWatch.stop().start("indices"); toClose.add(() ->stopWatch.stop().start("indices"));
injector.getInstance(IndicesTTLService.class).close(); toClose.add(injector.getInstance(IndicesTTLService.class));
injector.getInstance(IndicesService.class).close(); toClose.add(injector.getInstance(IndicesService.class));
// close filter/fielddata caches after indices // close filter/fielddata caches after indices
injector.getInstance(IndicesQueryCache.class).close(); toClose.add(injector.getInstance(IndicesQueryCache.class));
injector.getInstance(IndicesFieldDataCache.class).close(); toClose.add(injector.getInstance(IndicesFieldDataCache.class));
injector.getInstance(IndicesStore.class).close(); toClose.add(injector.getInstance(IndicesStore.class));
stopWatch.stop().start("routing"); toClose.add(() ->stopWatch.stop().start("routing"));
injector.getInstance(RoutingService.class).close(); toClose.add(injector.getInstance(RoutingService.class));
stopWatch.stop().start("cluster"); toClose.add(() ->stopWatch.stop().start("cluster"));
injector.getInstance(ClusterService.class).close(); toClose.add(injector.getInstance(ClusterService.class));
stopWatch.stop().start("discovery"); toClose.add(() ->stopWatch.stop().start("discovery"));
injector.getInstance(DiscoveryService.class).close(); toClose.add(injector.getInstance(DiscoveryService.class));
stopWatch.stop().start("monitor"); toClose.add(() ->stopWatch.stop().start("monitor"));
injector.getInstance(MonitorService.class).close(); toClose.add(injector.getInstance(MonitorService.class));
stopWatch.stop().start("gateway"); toClose.add(() ->stopWatch.stop().start("gateway"));
injector.getInstance(GatewayService.class).close(); toClose.add(injector.getInstance(GatewayService.class));
stopWatch.stop().start("search"); toClose.add(() ->stopWatch.stop().start("search"));
injector.getInstance(SearchService.class).close(); toClose.add(injector.getInstance(SearchService.class));
stopWatch.stop().start("rest"); toClose.add(() ->stopWatch.stop().start("rest"));
injector.getInstance(RestController.class).close(); toClose.add(injector.getInstance(RestController.class));
stopWatch.stop().start("transport"); toClose.add(() ->stopWatch.stop().start("transport"));
injector.getInstance(TransportService.class).close(); toClose.add(injector.getInstance(TransportService.class));
stopWatch.stop().start("percolator_service"); toClose.add(() ->stopWatch.stop().start("percolator_service"));
injector.getInstance(PercolatorService.class).close(); toClose.add(injector.getInstance(PercolatorService.class));
for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) { for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) {
stopWatch.stop().start("plugin(" + plugin.getName() + ")"); toClose.add(() ->stopWatch.stop().start("plugin(" + plugin.getName() + ")"));
injector.getInstance(plugin).close(); toClose.add(injector.getInstance(plugin));
} }
stopWatch.stop().start("script"); toClose.add(() ->stopWatch.stop().start("script"));
try { toClose.add(injector.getInstance(ScriptService.class));
injector.getInstance(ScriptService.class).close();
} catch(IOException e) {
logger.warn("ScriptService close failed", e);
}
stopWatch.stop().start("thread_pool"); toClose.add(() ->stopWatch.stop().start("thread_pool"));
// TODO this should really use ThreadPool.terminate() // TODO this should really use ThreadPool.terminate()
injector.getInstance(ThreadPool.class).shutdown(); toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
toClose.add(() -> {
try { try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS); injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} }
stopWatch.stop().start("thread_pool_force_shutdown"); });
try {
injector.getInstance(ThreadPool.class).shutdownNow(); toClose.add(() ->stopWatch.stop().start("thread_pool_force_shutdown"));
} catch (Exception e) { toClose.add(() -> injector.getInstance(ThreadPool.class).shutdownNow());
// ignore toClose.add(() -> stopWatch.stop());
}
stopWatch.stop();
toClose.add(injector.getInstance(NodeEnvironment.class));
toClose.add(injector.getInstance(PageCacheRecycler.class));
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint()); logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
} }
IOUtils.close(toClose);
injector.getInstance(NodeEnvironment.class).close();
injector.getInstance(PageCacheRecycler.class).close();
logger.info("closed"); logger.info("closed");
} }

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
@ -85,7 +86,7 @@ import java.util.stream.StreamSupport;
import static org.apache.lucene.search.BooleanClause.Occur.FILTER; import static org.apache.lucene.search.BooleanClause.Occur.FILTER;
import static org.apache.lucene.search.BooleanClause.Occur.MUST; import static org.apache.lucene.search.BooleanClause.Occur.MUST;
public class PercolatorService extends AbstractComponent { public class PercolatorService extends AbstractComponent implements Releasable {
public final static float NO_SCORE = Float.NEGATIVE_INFINITY; public final static float NO_SCORE = Float.NEGATIVE_INFINITY;
public final static String TYPE_NAME = ".percolator"; public final static String TYPE_NAME = ".percolator";
@ -304,6 +305,7 @@ public class PercolatorService extends AbstractComponent {
} }
} }
@Override
public void close() { public void close() {
cache.close(); cache.close();
} }

View File

@ -32,6 +32,8 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -48,7 +50,7 @@ public class TransportClientIT extends ESIntegTestCase {
} }
public void testNodeVersionIsUpdated() { public void testNodeVersionIsUpdated() throws IOException {
TransportClient client = (TransportClient) internalCluster().client(); TransportClient client = (TransportClient) internalCluster().client();
TransportClientNodesService nodeService = client.nodeService(); TransportClientNodesService nodeService = client.nodeService();
Node node = new Node(Settings.builder() Node node = new Node(Settings.builder()

View File

@ -51,6 +51,7 @@ import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -68,7 +69,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
private static Node NODE = null; private static Node NODE = null;
private void reset() { private void reset() throws IOException {
assert NODE != null; assert NODE != null;
stopNode(); stopNode();
startNode(); startNode();
@ -83,13 +84,13 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
assertFalse(clusterHealthResponse.isTimedOut()); assertFalse(clusterHealthResponse.isTimedOut());
} }
private static void stopNode() { private static void stopNode() throws IOException {
Node node = NODE; Node node = NODE;
NODE = null; NODE = null;
Releasables.close(node); node.close();
} }
private void cleanup(boolean resetNode) { private void cleanup(boolean resetNode) throws IOException {
assertAcked(client().admin().indices().prepareDelete("*").get()); assertAcked(client().admin().indices().prepareDelete("*").get());
if (resetNode) { if (resetNode) {
reset(); reset();
@ -126,7 +127,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
} }
@AfterClass @AfterClass
public static void tearDownClass() { public static void tearDownClass() throws IOException {
stopNode(); stopNode();
} }

View File

@ -815,7 +815,7 @@ public final class InternalTestCluster extends TestCluster {
} }
} }
void closeNode() { void closeNode() throws IOException {
registerDataPath(); registerDataPath();
node.close(); node.close();
} }