Merge pull request #16316 from s1monw/isseus/13685_2nd_try
Ensure all resoruces are closed on Node#close()
This commit is contained in:
commit
af0e40ec7d
|
@ -20,7 +20,9 @@
|
|||
package org.elasticsearch.bootstrap;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.StringHelper;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.PidFile;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
|
@ -40,6 +42,7 @@ import org.elasticsearch.node.Node;
|
|||
import org.elasticsearch.node.internal.InternalSettingsPreparer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Locale;
|
||||
|
@ -114,7 +117,11 @@ final class Bootstrap {
|
|||
public boolean handle(int code) {
|
||||
if (CTRL_CLOSE_EVENT == code) {
|
||||
logger.info("running graceful exit on windows");
|
||||
Bootstrap.stop();
|
||||
try {
|
||||
Bootstrap.stop();
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("failed to stop node", e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -153,8 +160,10 @@ final class Bootstrap {
|
|||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (node != null) {
|
||||
node.close();
|
||||
try {
|
||||
IOUtils.close(node);
|
||||
} catch (IOException ex) {
|
||||
throw new ElasticsearchException("failed to stop node", ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -221,9 +230,9 @@ final class Bootstrap {
|
|||
keepAliveThread.start();
|
||||
}
|
||||
|
||||
static void stop() {
|
||||
static void stop() throws IOException {
|
||||
try {
|
||||
Releasables.close(INSTANCE.node);
|
||||
IOUtils.close(INSTANCE.node);
|
||||
} finally {
|
||||
INSTANCE.keepAliveLatch.countDown();
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.bootstrap;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This class starts elasticsearch.
|
||||
*/
|
||||
|
@ -48,7 +50,7 @@ public final class Elasticsearch {
|
|||
*
|
||||
* NOTE: If this method is renamed and/or moved, make sure to update service.bat!
|
||||
*/
|
||||
static void close(String[] args) {
|
||||
static void close(String[] args) throws IOException {
|
||||
Bootstrap.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.cache.recycler;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.recycler.AbstractRecyclerC;
|
||||
import org.elasticsearch.common.recycler.Recycler;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -38,7 +39,7 @@ import static org.elasticsearch.common.recycler.Recyclers.dequeFactory;
|
|||
import static org.elasticsearch.common.recycler.Recyclers.none;
|
||||
|
||||
/** A recycler of fixed-size pages. */
|
||||
public class PageCacheRecycler extends AbstractComponent {
|
||||
public class PageCacheRecycler extends AbstractComponent implements Releasable {
|
||||
|
||||
public static final String TYPE = "recycler.page.type";
|
||||
public static final String LIMIT_HEAP = "recycler.page.limit.heap";
|
||||
|
@ -49,6 +50,7 @@ public class PageCacheRecycler extends AbstractComponent {
|
|||
private final Recycler<long[]> longPage;
|
||||
private final Recycler<Object[]> objectPage;
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
bytePage.close();
|
||||
intPage.close();
|
||||
|
|
|
@ -21,10 +21,12 @@ package org.elasticsearch.common.lease;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
/**
|
||||
* Specialization of {@link AutoCloseable} that may only throw an {@link ElasticsearchException}.
|
||||
*/
|
||||
public interface Releasable extends AutoCloseable {
|
||||
public interface Releasable extends Closeable {
|
||||
|
||||
@Override
|
||||
void close();
|
||||
|
|
|
@ -19,38 +19,24 @@
|
|||
|
||||
package org.elasticsearch.common.lease;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
/** Utility methods to work with {@link Releasable}s. */
|
||||
public enum Releasables {
|
||||
;
|
||||
|
||||
private static void rethrow(Throwable t) {
|
||||
if (t instanceof RuntimeException) {
|
||||
throw (RuntimeException) t;
|
||||
}
|
||||
if (t instanceof Error) {
|
||||
throw (Error) t;
|
||||
}
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
|
||||
private static void close(Iterable<? extends Releasable> releasables, boolean ignoreException) {
|
||||
Throwable th = null;
|
||||
for (Releasable releasable : releasables) {
|
||||
if (releasable != null) {
|
||||
try {
|
||||
releasable.close();
|
||||
} catch (Throwable t) {
|
||||
if (th == null) {
|
||||
th = t;
|
||||
}
|
||||
}
|
||||
try {
|
||||
// this does the right thing with respect to add suppressed and not wrapping errors etc.
|
||||
IOUtils.close(releasables);
|
||||
} catch (Throwable t) {
|
||||
if (ignoreException == false) {
|
||||
IOUtils.reThrowUnchecked(t);
|
||||
}
|
||||
}
|
||||
if (th != null && !ignoreException) {
|
||||
rethrow(th);
|
||||
}
|
||||
}
|
||||
|
||||
/** Release the provided {@link Releasable}s. */
|
||||
|
@ -99,25 +85,11 @@ public enum Releasables {
|
|||
* </pre>
|
||||
*/
|
||||
public static Releasable wrap(final Iterable<Releasable> releasables) {
|
||||
return new Releasable() {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(releasables);
|
||||
}
|
||||
|
||||
};
|
||||
return () -> close(releasables);
|
||||
}
|
||||
|
||||
/** @see #wrap(Iterable) */
|
||||
public static Releasable wrap(final Releasable... releasables) {
|
||||
return new Releasable() {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
Releasables.close(releasables);
|
||||
}
|
||||
|
||||
};
|
||||
return () -> close(releasables);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.cache.RemovalListener;
|
|||
import org.elasticsearch.common.cache.RemovalNotification;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -52,7 +53,7 @@ import java.util.function.ToLongBiFunction;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, Accountable> {
|
||||
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, Accountable>, Releasable{
|
||||
|
||||
public static final Setting<TimeValue> INDICES_FIELDDATA_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.fielddata.cache.cleanup_interval", TimeValue.timeValueMinutes(1), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<ByteSizeValue> INDICES_FIELDDATA_CACHE_SIZE_KEY = Setting.byteSizeSetting("indices.fielddata.cache.size", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER);
|
||||
|
@ -84,6 +85,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
|||
new FieldDataCacheCleaner(this.cache, this.logger, this.threadPool, this.cleanInterval));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cache.invalidateAll();
|
||||
this.closed = true;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.node;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Build;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -100,6 +101,7 @@ import org.elasticsearch.watcher.ResourceWatcherModule;
|
|||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
|
@ -108,9 +110,11 @@ import java.nio.charset.Charset;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
@ -120,7 +124,7 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
|||
* A node represent a node within a cluster (<tt>cluster.name</tt>). The {@link #client()} can be used
|
||||
* in order to use a {@link Client} to perform actions/operations against the cluster.
|
||||
*/
|
||||
public class Node implements Releasable {
|
||||
public class Node implements Closeable {
|
||||
|
||||
public static final Setting<Boolean> WRITE_PORTS_FIELD_SETTING = Setting.boolSetting("node.portsfile", false, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> NODE_CLIENT_SETTING = Setting.boolSetting("node.client", false, false, Setting.Scope.CLUSTER);
|
||||
|
@ -351,7 +355,7 @@ public class Node implements Releasable {
|
|||
// If not, the hook that is added in Bootstrap#setup() will be useless: close() might not be executed, in case another (for example api) call
|
||||
// to close() has already set some lifecycles to stopped. In this case the process will be terminated even if the first call to close() has not finished yet.
|
||||
@Override
|
||||
public synchronized void close() {
|
||||
public synchronized void close() throws IOException {
|
||||
if (lifecycle.started()) {
|
||||
stop();
|
||||
}
|
||||
|
@ -361,88 +365,80 @@ public class Node implements Releasable {
|
|||
|
||||
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
|
||||
logger.info("closing ...");
|
||||
|
||||
List<Closeable> toClose = new ArrayList<>();
|
||||
StopWatch stopWatch = new StopWatch("node_close");
|
||||
stopWatch.start("tribe");
|
||||
injector.getInstance(TribeService.class).close();
|
||||
stopWatch.stop().start("node_service");
|
||||
try {
|
||||
injector.getInstance(NodeService.class).close();
|
||||
} catch (IOException e) {
|
||||
logger.warn("NodeService close failed", e);
|
||||
}
|
||||
stopWatch.stop().start("http");
|
||||
toClose.add(() -> stopWatch.start("tribe"));
|
||||
toClose.add(injector.getInstance(TribeService.class));
|
||||
toClose.add(() -> stopWatch.stop().start("node_service"));
|
||||
toClose.add(injector.getInstance(NodeService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("http"));
|
||||
if (settings.getAsBoolean("http.enabled", true)) {
|
||||
injector.getInstance(HttpServer.class).close();
|
||||
toClose.add(injector.getInstance(HttpServer.class));
|
||||
}
|
||||
stopWatch.stop().start("snapshot_service");
|
||||
injector.getInstance(SnapshotsService.class).close();
|
||||
injector.getInstance(SnapshotShardsService.class).close();
|
||||
stopWatch.stop().start("client");
|
||||
toClose.add(() ->stopWatch.stop().start("snapshot_service"));
|
||||
toClose.add(injector.getInstance(SnapshotsService.class));
|
||||
toClose.add(injector.getInstance(SnapshotShardsService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("client"));
|
||||
Releasables.close(injector.getInstance(Client.class));
|
||||
stopWatch.stop().start("indices_cluster");
|
||||
injector.getInstance(IndicesClusterStateService.class).close();
|
||||
stopWatch.stop().start("indices");
|
||||
injector.getInstance(IndicesTTLService.class).close();
|
||||
injector.getInstance(IndicesService.class).close();
|
||||
toClose.add(() ->stopWatch.stop().start("indices_cluster"));
|
||||
toClose.add(injector.getInstance(IndicesClusterStateService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("indices"));
|
||||
toClose.add(injector.getInstance(IndicesTTLService.class));
|
||||
toClose.add(injector.getInstance(IndicesService.class));
|
||||
// close filter/fielddata caches after indices
|
||||
injector.getInstance(IndicesQueryCache.class).close();
|
||||
injector.getInstance(IndicesFieldDataCache.class).close();
|
||||
injector.getInstance(IndicesStore.class).close();
|
||||
stopWatch.stop().start("routing");
|
||||
injector.getInstance(RoutingService.class).close();
|
||||
stopWatch.stop().start("cluster");
|
||||
injector.getInstance(ClusterService.class).close();
|
||||
stopWatch.stop().start("discovery");
|
||||
injector.getInstance(DiscoveryService.class).close();
|
||||
stopWatch.stop().start("monitor");
|
||||
injector.getInstance(MonitorService.class).close();
|
||||
stopWatch.stop().start("gateway");
|
||||
injector.getInstance(GatewayService.class).close();
|
||||
stopWatch.stop().start("search");
|
||||
injector.getInstance(SearchService.class).close();
|
||||
stopWatch.stop().start("rest");
|
||||
injector.getInstance(RestController.class).close();
|
||||
stopWatch.stop().start("transport");
|
||||
injector.getInstance(TransportService.class).close();
|
||||
stopWatch.stop().start("percolator_service");
|
||||
injector.getInstance(PercolatorService.class).close();
|
||||
toClose.add(injector.getInstance(IndicesQueryCache.class));
|
||||
toClose.add(injector.getInstance(IndicesFieldDataCache.class));
|
||||
toClose.add(injector.getInstance(IndicesStore.class));
|
||||
toClose.add(() ->stopWatch.stop().start("routing"));
|
||||
toClose.add(injector.getInstance(RoutingService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("cluster"));
|
||||
toClose.add(injector.getInstance(ClusterService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("discovery"));
|
||||
toClose.add(injector.getInstance(DiscoveryService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("monitor"));
|
||||
toClose.add(injector.getInstance(MonitorService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("gateway"));
|
||||
toClose.add(injector.getInstance(GatewayService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("search"));
|
||||
toClose.add(injector.getInstance(SearchService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("rest"));
|
||||
toClose.add(injector.getInstance(RestController.class));
|
||||
toClose.add(() ->stopWatch.stop().start("transport"));
|
||||
toClose.add(injector.getInstance(TransportService.class));
|
||||
toClose.add(() ->stopWatch.stop().start("percolator_service"));
|
||||
toClose.add(injector.getInstance(PercolatorService.class));
|
||||
|
||||
for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) {
|
||||
stopWatch.stop().start("plugin(" + plugin.getName() + ")");
|
||||
injector.getInstance(plugin).close();
|
||||
toClose.add(() ->stopWatch.stop().start("plugin(" + plugin.getName() + ")"));
|
||||
toClose.add(injector.getInstance(plugin));
|
||||
}
|
||||
|
||||
stopWatch.stop().start("script");
|
||||
try {
|
||||
injector.getInstance(ScriptService.class).close();
|
||||
} catch(IOException e) {
|
||||
logger.warn("ScriptService close failed", e);
|
||||
}
|
||||
toClose.add(() ->stopWatch.stop().start("script"));
|
||||
toClose.add(injector.getInstance(ScriptService.class));
|
||||
|
||||
stopWatch.stop().start("thread_pool");
|
||||
toClose.add(() ->stopWatch.stop().start("thread_pool"));
|
||||
// TODO this should really use ThreadPool.terminate()
|
||||
injector.getInstance(ThreadPool.class).shutdown();
|
||||
try {
|
||||
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
stopWatch.stop().start("thread_pool_force_shutdown");
|
||||
try {
|
||||
injector.getInstance(ThreadPool.class).shutdownNow();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
stopWatch.stop();
|
||||
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
|
||||
toClose.add(() -> {
|
||||
try {
|
||||
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
});
|
||||
|
||||
toClose.add(() ->stopWatch.stop().start("thread_pool_force_shutdown"));
|
||||
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdownNow());
|
||||
toClose.add(() -> stopWatch.stop());
|
||||
|
||||
|
||||
toClose.add(injector.getInstance(NodeEnvironment.class));
|
||||
toClose.add(injector.getInstance(PageCacheRecycler.class));
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
|
||||
}
|
||||
|
||||
injector.getInstance(NodeEnvironment.class).close();
|
||||
injector.getInstance(PageCacheRecycler.class).close();
|
||||
|
||||
IOUtils.close(toClose);
|
||||
logger.info("closed");
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
|
@ -85,7 +86,7 @@ import java.util.stream.StreamSupport;
|
|||
import static org.apache.lucene.search.BooleanClause.Occur.FILTER;
|
||||
import static org.apache.lucene.search.BooleanClause.Occur.MUST;
|
||||
|
||||
public class PercolatorService extends AbstractComponent {
|
||||
public class PercolatorService extends AbstractComponent implements Releasable {
|
||||
|
||||
public final static float NO_SCORE = Float.NEGATIVE_INFINITY;
|
||||
public final static String TYPE_NAME = ".percolator";
|
||||
|
@ -304,6 +305,7 @@ public class PercolatorService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
cache.close();
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
|||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
@ -48,7 +50,7 @@ public class TransportClientIT extends ESIntegTestCase {
|
|||
|
||||
}
|
||||
|
||||
public void testNodeVersionIsUpdated() {
|
||||
public void testNodeVersionIsUpdated() throws IOException {
|
||||
TransportClient client = (TransportClient) internalCluster().client();
|
||||
TransportClientNodesService nodeService = client.nodeService();
|
||||
Node node = new Node(Settings.builder()
|
||||
|
|
|
@ -279,7 +279,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
|
|||
setMinimumMasterNodes(2);
|
||||
|
||||
// make sure it has been processed on all nodes (master node spawns a secondary cluster state update task)
|
||||
for (Client client : internalCluster()) {
|
||||
for (Client client : internalCluster().getClients()) {
|
||||
assertThat(client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setLocal(true).get().isTimedOut(),
|
||||
equalTo(false));
|
||||
}
|
||||
|
@ -303,7 +303,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
|
|||
assertTrue(awaitBusy(
|
||||
() -> {
|
||||
boolean success = true;
|
||||
for (Client client : internalCluster()) {
|
||||
for (Client client : internalCluster().getClients()) {
|
||||
boolean clientHasNoMasterBlock = hasNoMasterBlock.test(client);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Checking for NO_MASTER_BLOCK on client: {} NO_MASTER_BLOCK: [{}]", client, clientHasNoMasterBlock);
|
||||
|
|
|
@ -167,7 +167,7 @@ public class UpdateMappingOnClusterIT extends ESIntegTestCase {
|
|||
|
||||
private void compareMappingOnNodes(GetMappingsResponse previousMapping) {
|
||||
// make sure all nodes have same cluster state
|
||||
for (Client client : cluster()) {
|
||||
for (Client client : cluster().getClients()) {
|
||||
GetMappingsResponse currentMapping = client.admin().indices().prepareGetMappings(INDEX).addTypes(TYPE).setLocal(true).get();
|
||||
assertThat(previousMapping.getMappings().get(INDEX).get(TYPE).source(), equalTo(currentMapping.getMappings().get(INDEX).get(TYPE).source()));
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ public class SimpleValidateQueryIT extends ESIntegTestCase {
|
|||
|
||||
refresh();
|
||||
|
||||
for (Client client : internalCluster()) {
|
||||
for (Client client : internalCluster().getClients()) {
|
||||
ValidateQueryResponse response = client.admin().indices().prepareValidateQuery("test")
|
||||
.setQuery(QueryBuilders.wrapperQuery("foo".getBytes(StandardCharsets.UTF_8)))
|
||||
.setExplain(true)
|
||||
|
@ -104,7 +104,7 @@ public class SimpleValidateQueryIT extends ESIntegTestCase {
|
|||
|
||||
}
|
||||
|
||||
for (Client client : internalCluster()) {
|
||||
for (Client client : internalCluster().getClients()) {
|
||||
ValidateQueryResponse response = client.admin().indices().prepareValidateQuery("test")
|
||||
.setQuery(QueryBuilders.queryStringQuery("foo"))
|
||||
.setExplain(true)
|
||||
|
|
|
@ -19,12 +19,15 @@
|
|||
|
||||
package org.elasticsearch.repositories.azure;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.MockNode;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugin.repository.azure.AzureRepositoryPlugin;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
|
@ -112,8 +115,13 @@ public class AzureRepositoryF {
|
|||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
node.close();
|
||||
latch.countDown();
|
||||
try {
|
||||
IOUtils.close(node);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException(e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
node.start();
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.tribe;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -34,6 +35,7 @@ import org.elasticsearch.test.InternalTestCluster;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.either;
|
||||
|
@ -76,10 +78,9 @@ public class TribeUnitTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@AfterClass
|
||||
public static void closeTribes() {
|
||||
tribe1.close();
|
||||
public static void closeTribes() throws IOException {
|
||||
IOUtils.close(tribe1, tribe2);
|
||||
tribe1 = null;
|
||||
tribe2.close();
|
||||
tribe2 = null;
|
||||
}
|
||||
|
||||
|
|
|
@ -241,8 +241,8 @@ public class CompositeTestCluster extends TestCluster {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized Iterator<Client> iterator() {
|
||||
return Collections.singleton(client()).iterator();
|
||||
public synchronized Iterable<Client> getClients() {
|
||||
return Collections.singleton(client());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -129,6 +129,7 @@ import org.junit.AfterClass;
|
|||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.annotation.Annotation;
|
||||
|
@ -675,7 +676,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public static Iterable<Client> clients() {
|
||||
return cluster();
|
||||
return cluster().getClients();
|
||||
}
|
||||
|
||||
protected int minimumNumberOfShards() {
|
||||
|
@ -1099,7 +1100,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
|
||||
int masterClusterStateSize = masterClusterState.toString().length();
|
||||
String masterId = masterClusterState.nodes().masterNodeId();
|
||||
for (Client client : cluster()) {
|
||||
for (Client client : cluster().getClients()) {
|
||||
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
|
||||
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
|
||||
// remove local node reference
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.test;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
|
@ -51,6 +52,7 @@ import org.junit.AfterClass;
|
|||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -68,7 +70,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||
|
||||
private static Node NODE = null;
|
||||
|
||||
private void reset() {
|
||||
private void reset() throws IOException {
|
||||
assert NODE != null;
|
||||
stopNode();
|
||||
startNode();
|
||||
|
@ -83,13 +85,13 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||
assertFalse(clusterHealthResponse.isTimedOut());
|
||||
}
|
||||
|
||||
private static void stopNode() {
|
||||
private static void stopNode() throws IOException {
|
||||
Node node = NODE;
|
||||
NODE = null;
|
||||
Releasables.close(node);
|
||||
IOUtils.close(node);
|
||||
}
|
||||
|
||||
private void cleanup(boolean resetNode) {
|
||||
private void cleanup(boolean resetNode) throws IOException {
|
||||
assertAcked(client().admin().indices().prepareDelete("*").get());
|
||||
if (resetNode) {
|
||||
reset();
|
||||
|
@ -126,7 +128,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownClass() {
|
||||
public static void tearDownClass() throws IOException {
|
||||
stopNode();
|
||||
}
|
||||
|
||||
|
|
|
@ -167,8 +167,8 @@ public final class ExternalTestCluster extends TestCluster {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Client> iterator() {
|
||||
return Collections.singleton(client).iterator();
|
||||
public Iterable<Client> getClients() {
|
||||
return Collections.singleton(client);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -815,7 +815,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
}
|
||||
}
|
||||
|
||||
void closeNode() {
|
||||
void closeNode() throws IOException {
|
||||
registerDataPath();
|
||||
node.close();
|
||||
}
|
||||
|
@ -1720,27 +1720,29 @@ public final class InternalTestCluster extends TestCluster {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Iterator<Client> iterator() {
|
||||
public synchronized Iterable<Client> getClients() {
|
||||
ensureOpen();
|
||||
final Iterator<NodeAndClient> iterator = nodes.values().iterator();
|
||||
return new Iterator<Client>() {
|
||||
return () -> {
|
||||
ensureOpen();
|
||||
final Iterator<NodeAndClient> iterator = nodes.values().iterator();
|
||||
return new Iterator<Client>() {
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iterator.hasNext();
|
||||
}
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Client next() {
|
||||
return iterator.next().client(random);
|
||||
}
|
||||
@Override
|
||||
public Client next() {
|
||||
return iterator.next().client(random);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("");
|
||||
}
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("");
|
||||
}
|
||||
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
|
|||
* Base test cluster that exposes the basis to run tests against any elasticsearch cluster, whose layout
|
||||
* (e.g. number of nodes) is predefined and cannot be changed during the tests execution
|
||||
*/
|
||||
public abstract class TestCluster implements Iterable<Client>, Closeable {
|
||||
public abstract class TestCluster implements Closeable {
|
||||
|
||||
protected final ESLogger logger = Loggers.getLogger(getClass());
|
||||
private final long seed;
|
||||
|
@ -228,5 +228,10 @@ public abstract class TestCluster implements Iterable<Client>, Closeable {
|
|||
*/
|
||||
public abstract String getClusterName();
|
||||
|
||||
/**
|
||||
* Returns an {@link Iterable} over all clients in this test cluster
|
||||
*/
|
||||
public abstract Iterable<Client> getClients();
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -130,8 +130,8 @@ public class InternalTestClusterTests extends ESTestCase {
|
|||
cluster1.beforeTest(random, random.nextDouble());
|
||||
}
|
||||
assertArrayEquals(cluster0.getNodeNames(), cluster1.getNodeNames());
|
||||
Iterator<Client> iterator1 = cluster1.iterator();
|
||||
for (Client client : cluster0) {
|
||||
Iterator<Client> iterator1 = cluster1.getClients().iterator();
|
||||
for (Client client : cluster0.getClients()) {
|
||||
assertTrue(iterator1.hasNext());
|
||||
Client other = iterator1.next();
|
||||
assertSettings(client.settings(), other.settings(), false);
|
||||
|
|
Loading…
Reference in New Issue