diff --git a/test/framework/src/main/java/org/elasticsearch/test/CompositeTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/CompositeTestCluster.java deleted file mode 100644 index c6470123090..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/CompositeTestCluster.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test; - -import com.carrotsearch.randomizedtesting.generators.RandomPicks; -import org.apache.lucene.util.IOUtils; -import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.FilterClient; -import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Random; -import java.util.stream.Collectors; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -/** - * A test cluster implementation that holds a fixed set of external nodes as well as a InternalTestCluster - * which is used to run mixed version clusters in tests like backwards compatibility tests. - * Note: this is an experimental API - */ -public class CompositeTestCluster extends TestCluster { - private final InternalTestCluster cluster; - private final ExternalNode[] externalNodes; - private final ExternalClient client = new ExternalClient(); - private static final String NODE_PREFIX = "external_"; - - public CompositeTestCluster(InternalTestCluster cluster, int numExternalNodes, ExternalNode externalNode) throws IOException { - super(cluster.seed()); - this.cluster = cluster; - this.externalNodes = new ExternalNode[numExternalNodes]; - for (int i = 0; i < externalNodes.length; i++) { - externalNodes[i] = externalNode; - } - } - - @Override - public synchronized void afterTest() throws IOException { - cluster.afterTest(); - } - - @Override - public synchronized void beforeTest(Random random, double transportClientRatio) throws IOException, InterruptedException { - super.beforeTest(random, transportClientRatio); - cluster.beforeTest(random, transportClientRatio); - Settings defaultSettings = cluster.getDefaultSettings(); - final Client client = cluster.size() > 0 ? cluster.client() : cluster.coordOnlyNodeClient(); - for (int i = 0; i < externalNodes.length; i++) { - if (!externalNodes[i].running()) { - externalNodes[i] = externalNodes[i].start(client, defaultSettings, NODE_PREFIX + i, cluster.getClusterName(), i); - } - externalNodes[i].reset(random.nextLong()); - } - if (size() > 0) { - client().admin().cluster().prepareHealth().setWaitForNodes(">=" + Integer.toString(this.size())).get(); - } - } - - private Collection runningNodes() { - return Arrays - .stream(externalNodes) - .filter(input -> input.running()) - .collect(Collectors.toCollection(ArrayList::new)); - } - - /** - * Upgrades one external running node to a node from the version running the tests. Commonly this is used - * to move from a node with version N-1 to a node running version N. This works seamless since they will - * share the same data directory. This method will return true iff a node got upgraded otherwise if no - * external node is running it returns false - */ - public synchronized boolean upgradeOneNode() throws InterruptedException, IOException { - return upgradeOneNode(Settings.EMPTY); - } - - /** - * Upgrades all external running nodes to a node from the version running the tests. - * All nodes are shut down before the first upgrade happens. - * @return true iff at least one node as upgraded. - */ - public synchronized boolean upgradeAllNodes() throws InterruptedException, IOException { - return upgradeAllNodes(Settings.EMPTY); - } - - - /** - * Upgrades all external running nodes to a node from the version running the tests. - * All nodes are shut down before the first upgrade happens. - * @return true iff at least one node as upgraded. - * @param nodeSettings settings for the upgrade nodes - */ - public synchronized boolean upgradeAllNodes(Settings nodeSettings) throws InterruptedException, IOException { - boolean upgradedOneNode = false; - while(upgradeOneNode(nodeSettings)) { - upgradedOneNode = true; - } - return upgradedOneNode; - } - - /** - * Upgrades one external running node to a node from the version running the tests. Commonly this is used - * to move from a node with version N-1 to a node running version N. This works seamless since they will - * share the same data directory. This method will return true iff a node got upgraded otherwise if no - * external node is running it returns false - */ - public synchronized boolean upgradeOneNode(Settings nodeSettings) throws InterruptedException, IOException { - Collection runningNodes = runningNodes(); - if (!runningNodes.isEmpty()) { - final Client existingClient = cluster.client(); - ExternalNode externalNode = RandomPicks.randomFrom(random, runningNodes); - externalNode.stop(); - String s = cluster.startNode(nodeSettings); - ExternalNode.waitForNode(existingClient, s); - assertNoTimeout(existingClient.admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(size())).get()); - return true; - } - return false; - } - - - /** - * Returns the a simple pattern that matches all "new" nodes in the cluster. - */ - public String newNodePattern() { - return cluster.nodePrefix() + "*"; - } - - /** - * Returns the a simple pattern that matches all "old" / "backwardss" nodes in the cluster. - */ - public String backwardsNodePattern() { - return NODE_PREFIX + "*"; - } - - /** - * Allows allocation of shards of the given indices on all nodes in the cluster. - */ - public void allowOnAllNodes(String... index) { - Settings build = Settings.builder().put("index.routing.allocation.exclude._name", "").build(); - client().admin().indices().prepareUpdateSettings(index).setSettings(build).execute().actionGet(); - } - - /** - * Allows allocation of shards of the given indices only on "new" nodes in the cluster. - * Note: if a shard is allocated on an "old" node and can't be allocated on a "new" node it will only be removed it can - * be allocated on some other "new" node. - */ - public void allowOnlyNewNodes(String... index) { - Settings build = Settings.builder().put("index.routing.allocation.exclude._name", backwardsNodePattern()).build(); - client().admin().indices().prepareUpdateSettings(index).setSettings(build).execute().actionGet(); - } - - /** - * Starts a current version data node - */ - public void startNewNode() { - cluster.startNode(); - } - - - @Override - public synchronized Client client() { - return client; - } - - @Override - public synchronized int size() { - return runningNodes().size() + cluster.size(); - } - - @Override - public int numDataNodes() { - return runningNodes().size() + cluster.numDataNodes(); - } - - @Override - public int numDataAndMasterNodes() { - return runningNodes().size() + cluster.numDataAndMasterNodes(); - } - - @Override - public InetSocketAddress[] httpAddresses() { - return cluster.httpAddresses(); - } - - @Override - public void close() throws IOException { - try { - IOUtils.close(externalNodes); - } finally { - IOUtils.close(cluster); - } - } - - @Override - public void ensureEstimatedStats() { - if (size() > 0) { - NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats() - .clear().setBreaker(true).execute().actionGet(); - for (NodeStats stats : nodeStats.getNodes()) { - assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(), - stats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), equalTo(0L)); - } - // CompositeTestCluster does not check the request breaker, - // because checking it requires a network request, which in - // turn increments the breaker, making it non-0 - } - } - - @Override - public String getClusterName() { - return cluster.getClusterName(); - } - - @Override - public synchronized Iterable getClients() { - return Collections.singleton(client()); - } - - /** - * Delegates to {@link org.elasticsearch.test.InternalTestCluster#fullRestart()} - */ - public void fullRestartInternalCluster() throws Exception { - cluster.fullRestart(); - } - - /** - * Returns the number of current version data nodes in the cluster - */ - public int numNewDataNodes() { - return cluster.numDataNodes(); - } - - /** - * Returns the number of former version data nodes in the cluster - */ - public int numBackwardsDataNodes() { - return runningNodes().size(); - } - - public TransportAddress externalTransportAddress() { - return RandomPicks.randomFrom(random, externalNodes).getTransportAddress(); - } - - public InternalTestCluster internalCluster() { - return cluster; - } - - private synchronized Client internalClient() { - Collection externalNodes = runningNodes(); - return random.nextBoolean() && !externalNodes.isEmpty() ? RandomPicks.randomFrom(random, externalNodes).getClient() : cluster.client(); - } - - private final class ExternalClient extends FilterClient { - - public ExternalClient() { - super(internalClient()); - } - - @Override - public void close() { - // never close this client - } - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 2c87684b3d8..5d38e53a077 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1918,8 +1918,6 @@ public abstract class ESIntegTestCase extends ESTestCase { public Path randomRepoPath() { if (currentCluster instanceof InternalTestCluster) { return randomRepoPath(((InternalTestCluster) currentCluster).getDefaultSettings()); - } else if (currentCluster instanceof CompositeTestCluster) { - return randomRepoPath(((CompositeTestCluster) currentCluster).internalCluster().getDefaultSettings()); } throw new UnsupportedOperationException("unsupported cluster type"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ExternalNode.java b/test/framework/src/main/java/org/elasticsearch/test/ExternalNode.java deleted file mode 100644 index 2e8001bf0f4..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/ExternalNode.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test; - -import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.Constants; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.network.NetworkModule; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.transport.MockTransportClient; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -/** - * Simple helper class to start external nodes to be used within a test cluster - */ -final class ExternalNode implements Closeable { - - public static final Settings REQUIRED_SETTINGS = Settings.builder() - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen") - .put(NetworkModule.TRANSPORT_TYPE_KEY, Randomness.get().nextBoolean() ? "netty3" : "netty4").build(); // we need network mode for this - - private final Path path; - private final Random random; - private final NodeConfigurationSource nodeConfigurationSource; - private Process process; - private NodeInfo nodeInfo; - private final String clusterName; - private TransportClient client; - - private final Logger logger = Loggers.getLogger(getClass()); - private Settings externalNodeSettings; - - - ExternalNode(Path path, long seed, NodeConfigurationSource nodeConfigurationSource) { - this(path, null, seed, nodeConfigurationSource); - } - - ExternalNode(Path path, String clusterName, long seed, NodeConfigurationSource nodeConfigurationSource) { - if (!Files.isDirectory(path)) { - throw new IllegalArgumentException("path must be a directory"); - } - this.path = path; - this.clusterName = clusterName; - this.random = new Random(seed); - this.nodeConfigurationSource = nodeConfigurationSource; - } - - synchronized ExternalNode start(Client localNode, Settings defaultSettings, String nodeName, String clusterName, int nodeOrdinal) throws IOException, InterruptedException { - ExternalNode externalNode = new ExternalNode(path, clusterName, random.nextLong(), nodeConfigurationSource); - Settings settings = Settings.builder().put(defaultSettings).put(nodeConfigurationSource.nodeSettings(nodeOrdinal)).build(); - externalNode.startInternal(localNode, settings, nodeName, clusterName); - return externalNode; - } - - @SuppressForbidden(reason = "needs java.io.File api to start a process") - synchronized void startInternal(Client client, Settings settings, String nodeName, String clusterName) throws IOException, InterruptedException { - if (process != null) { - throw new IllegalStateException("Already started"); - } - List params = new ArrayList<>(); - - if (!Constants.WINDOWS) { - params.add("bin/elasticsearch"); - } else { - params.add("bin/elasticsearch.bat"); - } - params.add("-Ecluster.name=" + clusterName); - params.add("-Enode.name=" + nodeName); - Settings.Builder externaNodeSettingsBuilder = Settings.builder(); - for (Map.Entry entry : settings.getAsMap().entrySet()) { - switch (entry.getKey()) { - case "cluster.name": - case "node.name": - case "path.home": - case NetworkModule.TRANSPORT_TYPE_KEY: - case "discovery.type": - case "config.ignore_system_properties": - continue; - default: - externaNodeSettingsBuilder.put(entry.getKey(), entry.getValue()); - - } - } - this.externalNodeSettings = externaNodeSettingsBuilder.put(REQUIRED_SETTINGS).build(); - for (Map.Entry entry : externalNodeSettings.getAsMap().entrySet()) { - params.add("-E" + entry.getKey() + "=" + entry.getValue()); - } - - params.add("-Epath.home=" + PathUtils.get(".").toAbsolutePath()); - params.add("-Epath.conf=" + path + "/config"); - - ProcessBuilder builder = new ProcessBuilder(params); - builder.directory(path.toFile()); - builder.inheritIO(); - boolean success = false; - try { - logger.info("starting external node [{}] with: {}", nodeName, builder.command()); - process = builder.start(); - this.nodeInfo = null; - if (waitForNode(client, nodeName)) { - nodeInfo = nodeInfo(client, nodeName); - assert nodeInfo != null; - logger.info("external node {} found, version [{}], build {}", nodeInfo.getNode(), nodeInfo.getVersion(), nodeInfo.getBuild()); - } else { - throw new IllegalStateException("Node [" + nodeName + "] didn't join the cluster"); - } - success = true; - } finally { - if (!success) { - stop(); - } - } - } - - static boolean waitForNode(final Client client, final String name) throws InterruptedException { - return ESTestCase.awaitBusy(() -> { - final NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().get(); - for (NodeInfo info : nodeInfos.getNodes()) { - if (name.equals(info.getNode().getName())) { - return true; - } - } - return false; - }, 30, TimeUnit.SECONDS); - } - - static NodeInfo nodeInfo(final Client client, final String nodeName) { - final NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().get(); - for (NodeInfo info : nodeInfos.getNodes()) { - if (nodeName.equals(info.getNode().getName())) { - return info; - } - } - return null; - } - - synchronized TransportAddress getTransportAddress() { - if (nodeInfo == null) { - throw new IllegalStateException("Node has not started yet"); - } - return nodeInfo.getTransport().getAddress().publishAddress(); - } - - synchronized Client getClient() { - if (nodeInfo == null) { - throw new IllegalStateException("Node has not started yet"); - } - if (client == null) { - TransportAddress addr = nodeInfo.getTransport().getAddress().publishAddress(); - // verify that the end node setting will have network enabled. - - Settings clientSettings = Settings.builder().put(externalNodeSettings) - .put("client.transport.nodes_sampler_interval", "1s") - .put("node.name", "transport_client_" + nodeInfo.getNode().getName()) - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", false).build(); - TransportClient client = new MockTransportClient(clientSettings); - client.addTransportAddress(addr); - this.client = client; - } - return client; - } - - synchronized void reset(long seed) { - this.random.setSeed(seed); - } - - synchronized void stop() throws InterruptedException { - if (running()) { - try { - if (this.client != null) { - client.close(); - } - } finally { - process.destroy(); - process.waitFor(); - process = null; - nodeInfo = null; - - } - } - } - - - synchronized boolean running() { - return process != null; - } - - @Override - public void close() { - try { - stop(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - synchronized String getName() { - if (nodeInfo == null) { - throw new IllegalStateException("Node has not started yet"); - } - return nodeInfo.getNode().getName(); - } -}