[TEST] Remove CompositeTestCluster and ExternalNode (#21933)
They are not used anymore. Related #21915
This commit is contained in:
parent
842e00c689
commit
fe95aef6a9
|
@ -1,292 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.test;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.FilterClient;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
/**
|
||||
* A test cluster implementation that holds a fixed set of external nodes as well as a InternalTestCluster
|
||||
* which is used to run mixed version clusters in tests like backwards compatibility tests.
|
||||
* Note: this is an experimental API
|
||||
*/
|
||||
public class CompositeTestCluster extends TestCluster {
|
||||
private final InternalTestCluster cluster;
|
||||
private final ExternalNode[] externalNodes;
|
||||
private final ExternalClient client = new ExternalClient();
|
||||
private static final String NODE_PREFIX = "external_";
|
||||
|
||||
public CompositeTestCluster(InternalTestCluster cluster, int numExternalNodes, ExternalNode externalNode) throws IOException {
|
||||
super(cluster.seed());
|
||||
this.cluster = cluster;
|
||||
this.externalNodes = new ExternalNode[numExternalNodes];
|
||||
for (int i = 0; i < externalNodes.length; i++) {
|
||||
externalNodes[i] = externalNode;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void afterTest() throws IOException {
|
||||
cluster.afterTest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void beforeTest(Random random, double transportClientRatio) throws IOException, InterruptedException {
|
||||
super.beforeTest(random, transportClientRatio);
|
||||
cluster.beforeTest(random, transportClientRatio);
|
||||
Settings defaultSettings = cluster.getDefaultSettings();
|
||||
final Client client = cluster.size() > 0 ? cluster.client() : cluster.coordOnlyNodeClient();
|
||||
for (int i = 0; i < externalNodes.length; i++) {
|
||||
if (!externalNodes[i].running()) {
|
||||
externalNodes[i] = externalNodes[i].start(client, defaultSettings, NODE_PREFIX + i, cluster.getClusterName(), i);
|
||||
}
|
||||
externalNodes[i].reset(random.nextLong());
|
||||
}
|
||||
if (size() > 0) {
|
||||
client().admin().cluster().prepareHealth().setWaitForNodes(">=" + Integer.toString(this.size())).get();
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<ExternalNode> runningNodes() {
|
||||
return Arrays
|
||||
.stream(externalNodes)
|
||||
.filter(input -> input.running())
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
}
|
||||
|
||||
/**
|
||||
* Upgrades one external running node to a node from the version running the tests. Commonly this is used
|
||||
* to move from a node with version N-1 to a node running version N. This works seamless since they will
|
||||
* share the same data directory. This method will return <tt>true</tt> iff a node got upgraded otherwise if no
|
||||
* external node is running it returns <tt>false</tt>
|
||||
*/
|
||||
public synchronized boolean upgradeOneNode() throws InterruptedException, IOException {
|
||||
return upgradeOneNode(Settings.EMPTY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Upgrades all external running nodes to a node from the version running the tests.
|
||||
* All nodes are shut down before the first upgrade happens.
|
||||
* @return <code>true</code> iff at least one node as upgraded.
|
||||
*/
|
||||
public synchronized boolean upgradeAllNodes() throws InterruptedException, IOException {
|
||||
return upgradeAllNodes(Settings.EMPTY);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Upgrades all external running nodes to a node from the version running the tests.
|
||||
* All nodes are shut down before the first upgrade happens.
|
||||
* @return <code>true</code> iff at least one node as upgraded.
|
||||
* @param nodeSettings settings for the upgrade nodes
|
||||
*/
|
||||
public synchronized boolean upgradeAllNodes(Settings nodeSettings) throws InterruptedException, IOException {
|
||||
boolean upgradedOneNode = false;
|
||||
while(upgradeOneNode(nodeSettings)) {
|
||||
upgradedOneNode = true;
|
||||
}
|
||||
return upgradedOneNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upgrades one external running node to a node from the version running the tests. Commonly this is used
|
||||
* to move from a node with version N-1 to a node running version N. This works seamless since they will
|
||||
* share the same data directory. This method will return <tt>true</tt> iff a node got upgraded otherwise if no
|
||||
* external node is running it returns <tt>false</tt>
|
||||
*/
|
||||
public synchronized boolean upgradeOneNode(Settings nodeSettings) throws InterruptedException, IOException {
|
||||
Collection<ExternalNode> runningNodes = runningNodes();
|
||||
if (!runningNodes.isEmpty()) {
|
||||
final Client existingClient = cluster.client();
|
||||
ExternalNode externalNode = RandomPicks.randomFrom(random, runningNodes);
|
||||
externalNode.stop();
|
||||
String s = cluster.startNode(nodeSettings);
|
||||
ExternalNode.waitForNode(existingClient, s);
|
||||
assertNoTimeout(existingClient.admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(size())).get());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the a simple pattern that matches all "new" nodes in the cluster.
|
||||
*/
|
||||
public String newNodePattern() {
|
||||
return cluster.nodePrefix() + "*";
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the a simple pattern that matches all "old" / "backwardss" nodes in the cluster.
|
||||
*/
|
||||
public String backwardsNodePattern() {
|
||||
return NODE_PREFIX + "*";
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows allocation of shards of the given indices on all nodes in the cluster.
|
||||
*/
|
||||
public void allowOnAllNodes(String... index) {
|
||||
Settings build = Settings.builder().put("index.routing.allocation.exclude._name", "").build();
|
||||
client().admin().indices().prepareUpdateSettings(index).setSettings(build).execute().actionGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows allocation of shards of the given indices only on "new" nodes in the cluster.
|
||||
* Note: if a shard is allocated on an "old" node and can't be allocated on a "new" node it will only be removed it can
|
||||
* be allocated on some other "new" node.
|
||||
*/
|
||||
public void allowOnlyNewNodes(String... index) {
|
||||
Settings build = Settings.builder().put("index.routing.allocation.exclude._name", backwardsNodePattern()).build();
|
||||
client().admin().indices().prepareUpdateSettings(index).setSettings(build).execute().actionGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a current version data node
|
||||
*/
|
||||
public void startNewNode() {
|
||||
cluster.startNode();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized Client client() {
|
||||
return client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int size() {
|
||||
return runningNodes().size() + cluster.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numDataNodes() {
|
||||
return runningNodes().size() + cluster.numDataNodes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numDataAndMasterNodes() {
|
||||
return runningNodes().size() + cluster.numDataAndMasterNodes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress[] httpAddresses() {
|
||||
return cluster.httpAddresses();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
IOUtils.close(externalNodes);
|
||||
} finally {
|
||||
IOUtils.close(cluster);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ensureEstimatedStats() {
|
||||
if (size() > 0) {
|
||||
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats()
|
||||
.clear().setBreaker(true).execute().actionGet();
|
||||
for (NodeStats stats : nodeStats.getNodes()) {
|
||||
assertThat("Fielddata breaker not reset to 0 on node: " + stats.getNode(),
|
||||
stats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getEstimated(), equalTo(0L));
|
||||
}
|
||||
// CompositeTestCluster does not check the request breaker,
|
||||
// because checking it requires a network request, which in
|
||||
// turn increments the breaker, making it non-0
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterName() {
|
||||
return cluster.getClusterName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Iterable<Client> getClients() {
|
||||
return Collections.singleton(client());
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegates to {@link org.elasticsearch.test.InternalTestCluster#fullRestart()}
|
||||
*/
|
||||
public void fullRestartInternalCluster() throws Exception {
|
||||
cluster.fullRestart();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of current version data nodes in the cluster
|
||||
*/
|
||||
public int numNewDataNodes() {
|
||||
return cluster.numDataNodes();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of former version data nodes in the cluster
|
||||
*/
|
||||
public int numBackwardsDataNodes() {
|
||||
return runningNodes().size();
|
||||
}
|
||||
|
||||
public TransportAddress externalTransportAddress() {
|
||||
return RandomPicks.randomFrom(random, externalNodes).getTransportAddress();
|
||||
}
|
||||
|
||||
public InternalTestCluster internalCluster() {
|
||||
return cluster;
|
||||
}
|
||||
|
||||
private synchronized Client internalClient() {
|
||||
Collection<ExternalNode> externalNodes = runningNodes();
|
||||
return random.nextBoolean() && !externalNodes.isEmpty() ? RandomPicks.randomFrom(random, externalNodes).getClient() : cluster.client();
|
||||
}
|
||||
|
||||
private final class ExternalClient extends FilterClient {
|
||||
|
||||
public ExternalClient() {
|
||||
super(internalClient());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// never close this client
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1918,8 +1918,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
public Path randomRepoPath() {
|
||||
if (currentCluster instanceof InternalTestCluster) {
|
||||
return randomRepoPath(((InternalTestCluster) currentCluster).getDefaultSettings());
|
||||
} else if (currentCluster instanceof CompositeTestCluster) {
|
||||
return randomRepoPath(((CompositeTestCluster) currentCluster).internalCluster().getDefaultSettings());
|
||||
}
|
||||
throw new UnsupportedOperationException("unsupported cluster type");
|
||||
}
|
||||
|
|
|
@ -1,238 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.test;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.PathUtils;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.transport.MockTransportClient;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Simple helper class to start external nodes to be used within a test cluster
|
||||
*/
|
||||
final class ExternalNode implements Closeable {
|
||||
|
||||
public static final Settings REQUIRED_SETTINGS = Settings.builder()
|
||||
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen")
|
||||
.put(NetworkModule.TRANSPORT_TYPE_KEY, Randomness.get().nextBoolean() ? "netty3" : "netty4").build(); // we need network mode for this
|
||||
|
||||
private final Path path;
|
||||
private final Random random;
|
||||
private final NodeConfigurationSource nodeConfigurationSource;
|
||||
private Process process;
|
||||
private NodeInfo nodeInfo;
|
||||
private final String clusterName;
|
||||
private TransportClient client;
|
||||
|
||||
private final Logger logger = Loggers.getLogger(getClass());
|
||||
private Settings externalNodeSettings;
|
||||
|
||||
|
||||
ExternalNode(Path path, long seed, NodeConfigurationSource nodeConfigurationSource) {
|
||||
this(path, null, seed, nodeConfigurationSource);
|
||||
}
|
||||
|
||||
ExternalNode(Path path, String clusterName, long seed, NodeConfigurationSource nodeConfigurationSource) {
|
||||
if (!Files.isDirectory(path)) {
|
||||
throw new IllegalArgumentException("path must be a directory");
|
||||
}
|
||||
this.path = path;
|
||||
this.clusterName = clusterName;
|
||||
this.random = new Random(seed);
|
||||
this.nodeConfigurationSource = nodeConfigurationSource;
|
||||
}
|
||||
|
||||
synchronized ExternalNode start(Client localNode, Settings defaultSettings, String nodeName, String clusterName, int nodeOrdinal) throws IOException, InterruptedException {
|
||||
ExternalNode externalNode = new ExternalNode(path, clusterName, random.nextLong(), nodeConfigurationSource);
|
||||
Settings settings = Settings.builder().put(defaultSettings).put(nodeConfigurationSource.nodeSettings(nodeOrdinal)).build();
|
||||
externalNode.startInternal(localNode, settings, nodeName, clusterName);
|
||||
return externalNode;
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "needs java.io.File api to start a process")
|
||||
synchronized void startInternal(Client client, Settings settings, String nodeName, String clusterName) throws IOException, InterruptedException {
|
||||
if (process != null) {
|
||||
throw new IllegalStateException("Already started");
|
||||
}
|
||||
List<String> params = new ArrayList<>();
|
||||
|
||||
if (!Constants.WINDOWS) {
|
||||
params.add("bin/elasticsearch");
|
||||
} else {
|
||||
params.add("bin/elasticsearch.bat");
|
||||
}
|
||||
params.add("-Ecluster.name=" + clusterName);
|
||||
params.add("-Enode.name=" + nodeName);
|
||||
Settings.Builder externaNodeSettingsBuilder = Settings.builder();
|
||||
for (Map.Entry<String, String> entry : settings.getAsMap().entrySet()) {
|
||||
switch (entry.getKey()) {
|
||||
case "cluster.name":
|
||||
case "node.name":
|
||||
case "path.home":
|
||||
case NetworkModule.TRANSPORT_TYPE_KEY:
|
||||
case "discovery.type":
|
||||
case "config.ignore_system_properties":
|
||||
continue;
|
||||
default:
|
||||
externaNodeSettingsBuilder.put(entry.getKey(), entry.getValue());
|
||||
|
||||
}
|
||||
}
|
||||
this.externalNodeSettings = externaNodeSettingsBuilder.put(REQUIRED_SETTINGS).build();
|
||||
for (Map.Entry<String, String> entry : externalNodeSettings.getAsMap().entrySet()) {
|
||||
params.add("-E" + entry.getKey() + "=" + entry.getValue());
|
||||
}
|
||||
|
||||
params.add("-Epath.home=" + PathUtils.get(".").toAbsolutePath());
|
||||
params.add("-Epath.conf=" + path + "/config");
|
||||
|
||||
ProcessBuilder builder = new ProcessBuilder(params);
|
||||
builder.directory(path.toFile());
|
||||
builder.inheritIO();
|
||||
boolean success = false;
|
||||
try {
|
||||
logger.info("starting external node [{}] with: {}", nodeName, builder.command());
|
||||
process = builder.start();
|
||||
this.nodeInfo = null;
|
||||
if (waitForNode(client, nodeName)) {
|
||||
nodeInfo = nodeInfo(client, nodeName);
|
||||
assert nodeInfo != null;
|
||||
logger.info("external node {} found, version [{}], build {}", nodeInfo.getNode(), nodeInfo.getVersion(), nodeInfo.getBuild());
|
||||
} else {
|
||||
throw new IllegalStateException("Node [" + nodeName + "] didn't join the cluster");
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static boolean waitForNode(final Client client, final String name) throws InterruptedException {
|
||||
return ESTestCase.awaitBusy(() -> {
|
||||
final NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().get();
|
||||
for (NodeInfo info : nodeInfos.getNodes()) {
|
||||
if (name.equals(info.getNode().getName())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
static NodeInfo nodeInfo(final Client client, final String nodeName) {
|
||||
final NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().get();
|
||||
for (NodeInfo info : nodeInfos.getNodes()) {
|
||||
if (nodeName.equals(info.getNode().getName())) {
|
||||
return info;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
synchronized TransportAddress getTransportAddress() {
|
||||
if (nodeInfo == null) {
|
||||
throw new IllegalStateException("Node has not started yet");
|
||||
}
|
||||
return nodeInfo.getTransport().getAddress().publishAddress();
|
||||
}
|
||||
|
||||
synchronized Client getClient() {
|
||||
if (nodeInfo == null) {
|
||||
throw new IllegalStateException("Node has not started yet");
|
||||
}
|
||||
if (client == null) {
|
||||
TransportAddress addr = nodeInfo.getTransport().getAddress().publishAddress();
|
||||
// verify that the end node setting will have network enabled.
|
||||
|
||||
Settings clientSettings = Settings.builder().put(externalNodeSettings)
|
||||
.put("client.transport.nodes_sampler_interval", "1s")
|
||||
.put("node.name", "transport_client_" + nodeInfo.getNode().getName())
|
||||
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", false).build();
|
||||
TransportClient client = new MockTransportClient(clientSettings);
|
||||
client.addTransportAddress(addr);
|
||||
this.client = client;
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
synchronized void reset(long seed) {
|
||||
this.random.setSeed(seed);
|
||||
}
|
||||
|
||||
synchronized void stop() throws InterruptedException {
|
||||
if (running()) {
|
||||
try {
|
||||
if (this.client != null) {
|
||||
client.close();
|
||||
}
|
||||
} finally {
|
||||
process.destroy();
|
||||
process.waitFor();
|
||||
process = null;
|
||||
nodeInfo = null;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
synchronized boolean running() {
|
||||
return process != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
stop();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized String getName() {
|
||||
if (nodeInfo == null) {
|
||||
throw new IllegalStateException("Node has not started yet");
|
||||
}
|
||||
return nodeInfo.getNode().getName();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue