Use file-based discovery not MockUncasedHostsProvider (#33554)

Today we use a special unicast hosts provider, the `MockUncasedHostsProvider`,
in many integration tests, to deal with the dynamic nature of the allocation of
ports to nodes. However #33241 allows us to use file-based discovery to achieve
the same goal, so the special test-only `MockUncasedHostsProvider` is no longer
required.

This change removes `MockUncasedHostProvider` and replaces it with file-based
discovery in tests based on `EsIntegTestCase`.
This commit is contained in:
David Turner 2018-09-13 07:37:15 +02:00 committed by GitHub
parent 94f6d4560d
commit 5a3fd8e4e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 178 additions and 164 deletions

View File

@ -131,7 +131,7 @@ public class DiscoveryModule {
if (discoverySupplier == null) { if (discoverySupplier == null) {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
} }
Loggers.getLogger(getClass(), settings).info("using discovery type [{}]", discoveryType); Loggers.getLogger(getClass(), settings).info("using discovery type [{}] and host providers {}", discoveryType, hostsProviderNames);
discovery = Objects.requireNonNull(discoverySupplier.get()); discovery = Objects.requireNonNull(discoverySupplier.get());
} }

View File

@ -53,7 +53,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
@ -113,6 +112,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder() return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// manual collection or upon cluster forming. // manual collection or upon cluster forming.
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2) .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2)
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.getKey(), "1s") .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.getKey(), "1s")
@ -121,8 +121,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TestPlugin.class, return Arrays.asList(TestPlugin.class, MockTransportService.TestPlugin.class);
MockTransportService.TestPlugin.class);
} }
public void testClusterInfoServiceCollectsInformation() throws Exception { public void testClusterInfoServiceCollectsInformation() throws Exception {
@ -172,7 +171,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
} }
} }
public void testClusterInfoServiceInformationClearOnError() throws InterruptedException, ExecutionException { public void testClusterInfoServiceInformationClearOnError() {
internalCluster().startNodes(2, internalCluster().startNodes(2,
// manually control publishing // manually control publishing
Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build()); Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build());

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.LIMIT_LOCAL_PORTS_COUNT;
import static org.elasticsearch.transport.TcpTransport.PORT;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class SettingsBasedHostProviderIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
// super.nodeSettings enables file-based discovery, but here we disable it again so we can test the static list:
if (randomBoolean()) {
builder.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey());
} else {
builder.remove(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey());
}
// super.nodeSettings sets this to an empty list, which disables any search for other nodes, but here we want this to happen:
builder.remove(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey());
return builder.build();
}
public void testClusterFormsWithSingleSeedHostInSettings() {
final String seedNodeName = internalCluster().startNode();
final NodesInfoResponse nodesInfoResponse
= client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet();
final String seedNodeAddress = nodesInfoResponse.getNodes().get(0).getTransport().getAddress().publishAddress().toString();
logger.info("--> using seed node address {}", seedNodeAddress);
int extraNodes = randomIntBetween(1, 5);
internalCluster().startNodes(extraNodes,
Settings.builder().putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), seedNodeAddress).build());
ensureStableCluster(extraNodes + 1);
}
public void testClusterFormsByScanningPorts() {
// This test will fail if all 4 ports just less than the one used by the first node are already bound by something else. It's hard
// to know how often this might happen in reality, so let's try it and see.
final String seedNodeName = internalCluster().startNode();
final NodesInfoResponse nodesInfoResponse
= client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet();
final int seedNodePort = nodesInfoResponse.getNodes().get(0).getTransport().getAddress().publishAddress().getPort();
final int minPort = randomIntBetween(seedNodePort - LIMIT_LOCAL_PORTS_COUNT + 1, seedNodePort - 1);
final String portSpec = minPort + "-" + seedNodePort;
logger.info("--> using port specification [{}]", portSpec);
internalCluster().startNode(Settings.builder().put(PORT.getKey(), portSpec));
ensureStableCluster(2);
}
}

View File

@ -69,7 +69,10 @@ public class SearchCancellationIT extends ESIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
boolean lowLevelCancellation = randomBoolean(); boolean lowLevelCancellation = randomBoolean();
logger.info("Using lowLevelCancellation: {}", lowLevelCancellation); logger.info("Using lowLevelCancellation: {}", lowLevelCancellation);
return Settings.builder().put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation).build(); return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation)
.build();
} }
private void indexTestData() { private void indexTestData() {

View File

@ -206,6 +206,8 @@ import static org.elasticsearch.client.Requests.syncedFlushRequest;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.util.CollectionUtils.eagerPartition; import static org.elasticsearch.common.util.CollectionUtils.eagerPartition;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
@ -1808,7 +1810,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
// wait short time for other active shards before actually deleting, default 30s not needed in tests // wait short time for other active shards before actually deleting, default 30s not needed in tests
.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)) .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS))
// randomly enable low-level search cancellation to make sure it does not alter results // randomly enable low-level search cancellation to make sure it does not alter results
.put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()); .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean())
.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes
.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file");
if (rarely()) { if (rarely()) {
// Sometimes adjust the minimum search thread pool size, causing // Sometimes adjust the minimum search thread pool size, causing
// QueueResizingEsThreadPoolExecutor to be used instead of a regular // QueueResizingEsThreadPoolExecutor to be used instead of a regular
@ -1921,7 +1925,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
networkSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType()); networkSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
} }
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { return new NodeConfigurationSource() {
@Override @Override
public Settings nodeSettings(int nodeOrdinal) { public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder() return Settings.builder()
@ -1955,7 +1959,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
return Collections.unmodifiableCollection(plugins); return Collections.unmodifiableCollection(plugins);
} }
}; };
return nodeConfigurationSource;
} }
/** /**
@ -2029,7 +2032,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
public static final class TestSeedPlugin extends Plugin { public static final class TestSeedPlugin extends Plugin {
@Override @Override
public List<Setting<?>> getSettings() { public List<Setting<?>> getSettings() {
return Arrays.asList(INDEX_TEST_SEED_SETTING); return Collections.singletonList(INDEX_TEST_SEED_SETTING);
} }
} }

View File

@ -62,6 +62,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -197,6 +198,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
// turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we // turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we
// turn it off for these tests. // turn it off for these tests.
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes
.put(nodeSettings()) // allow test cases to provide their own settings or override these .put(nodeSettings()) // allow test cases to provide their own settings or override these
.build(); .build();
Collection<Class<? extends Plugin>> plugins = getPlugins(); Collection<Class<? extends Plugin>> plugins = getPlugins();

View File

@ -47,6 +47,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
@ -103,6 +104,7 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -114,6 +116,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.Objects;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -128,10 +131,12 @@ import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
import static org.apache.lucene.util.LuceneTestCase.rarely; import static org.apache.lucene.util.LuceneTestCase.rarely;
import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE;
import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.getTestTransportType; import static org.elasticsearch.test.ESTestCase.getTestTransportType;
@ -486,11 +491,13 @@ public final class InternalTestCluster extends TestCluster {
private synchronized NodeAndClient getOrBuildRandomNode() { private synchronized NodeAndClient getOrBuildRandomNode() {
ensureOpen(); ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); final NodeAndClient randomNodeAndClient = getRandomNodeAndClient();
if (randomNodeAndClient != null) { if (randomNodeAndClient != null) {
return randomNodeAndClient; return randomNodeAndClient;
} }
NodeAndClient buildNode = buildNode(1); final int ord = nextNodeId.getAndIncrement();
final Runnable onTransportServiceStarted = () -> {}; // do not create unicast host file for this one node.
final NodeAndClient buildNode = buildNode(ord, random.nextLong(), null, false, 1, onTransportServiceStarted);
buildNode.startNode(); buildNode.startNode();
publishNode(buildNode); publishNode(buildNode);
return buildNode; return buildNode;
@ -562,20 +569,11 @@ public final class InternalTestCluster extends TestCluster {
* *
* @param settings the settings to use * @param settings the settings to use
* @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
* @param onTransportServiceStarted callback to run when transport service is started
*/ */
private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes) { private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes, Runnable onTransportServiceStarted) {
int ord = nextNodeId.getAndIncrement(); int ord = nextNodeId.getAndIncrement();
return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes); return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes, onTransportServiceStarted);
}
/**
* builds a new node with default settings
*
* @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
*/
private NodeAndClient buildNode(int defaultMinMasterNodes) {
int ord = nextNodeId.getAndIncrement();
return buildNode(ord, random.nextLong(), null, false, defaultMinMasterNodes);
} }
/** /**
@ -587,15 +585,17 @@ public final class InternalTestCluster extends TestCluster {
* @param reuseExisting if a node with the same name is already part of {@link #nodes}, no new node will be built and * @param reuseExisting if a node with the same name is already part of {@link #nodes}, no new node will be built and
* the method will return the existing one * the method will return the existing one
* @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed
* @param onTransportServiceStarted callback to run when transport service is started
*/ */
private NodeAndClient buildNode(int nodeId, long seed, Settings settings, private NodeAndClient buildNode(int nodeId, long seed, Settings settings,
boolean reuseExisting, int defaultMinMasterNodes) { boolean reuseExisting, int defaultMinMasterNodes, Runnable onTransportServiceStarted) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
ensureOpen(); ensureOpen();
settings = getSettings(nodeId, seed, settings); settings = getSettings(nodeId, seed, settings);
Collection<Class<? extends Plugin>> plugins = getPlugins(); Collection<Class<? extends Plugin>> plugins = getPlugins();
String name = buildNodeName(nodeId, settings); String name = buildNodeName(nodeId, settings);
if (reuseExisting && nodes.containsKey(name)) { if (reuseExisting && nodes.containsKey(name)) {
onTransportServiceStarted.run(); // reusing an existing node implies its transport service already started
return nodes.get(name); return nodes.get(name);
} else { } else {
assert reuseExisting == true || nodes.containsKey(name) == false : assert reuseExisting == true || nodes.containsKey(name) == false :
@ -631,6 +631,12 @@ public final class InternalTestCluster extends TestCluster {
plugins, plugins,
nodeConfigurationSource.nodeConfigPath(nodeId), nodeConfigurationSource.nodeConfigPath(nodeId),
forbidPrivateIndexSettings); forbidPrivateIndexSettings);
node.injector().getInstance(TransportService.class).addLifecycleListener(new LifecycleListener() {
@Override
public void afterStart() {
onTransportServiceStarted.run();
}
});
try { try {
IOUtils.close(secureSettings); IOUtils.close(secureSettings);
} catch (IOException e) { } catch (IOException e) {
@ -907,14 +913,15 @@ public final class InternalTestCluster extends TestCluster {
if (!node.isClosed()) { if (!node.isClosed()) {
closeNode(); closeNode();
} }
recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes); recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes, () -> rebuildUnicastHostFiles(emptyList()));
startNode(); startNode();
} }
/** /**
* rebuilds a new node object using the current node settings and starts it * rebuilds a new node object using the current node settings and starts it
*/ */
void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception { void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes,
Runnable onTransportServiceStarted) throws Exception {
assert callback != null; assert callback != null;
Settings callbackSettings = callback.onNodeStopped(name); Settings callbackSettings = callback.onNodeStopped(name);
Settings.Builder newSettings = Settings.builder(); Settings.Builder newSettings = Settings.builder();
@ -928,7 +935,7 @@ public final class InternalTestCluster extends TestCluster {
if (clearDataIfNeeded) { if (clearDataIfNeeded) {
clearDataIfNeeded(callback); clearDataIfNeeded(callback);
} }
createNewNode(newSettings.build()); createNewNode(newSettings.build(), onTransportServiceStarted);
// make sure cached client points to new node // make sure cached client points to new node
resetClient(); resetClient();
} }
@ -944,7 +951,7 @@ public final class InternalTestCluster extends TestCluster {
} }
} }
private void createNewNode(final Settings newSettings) { private void createNewNode(final Settings newSettings, final Runnable onTransportServiceStarted) {
final long newIdSeed = NodeEnvironment.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id final long newIdSeed = NodeEnvironment.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id
Settings finalSettings = Settings.builder().put(node.originalSettings()).put(newSettings).put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build(); Settings finalSettings = Settings.builder().put(node.originalSettings()).put(newSettings).put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build();
if (DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(finalSettings) == false) { if (DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(finalSettings) == false) {
@ -953,6 +960,12 @@ public final class InternalTestCluster extends TestCluster {
} }
Collection<Class<? extends Plugin>> plugins = node.getClasspathPlugins(); Collection<Class<? extends Plugin>> plugins = node.getClasspathPlugins();
node = new MockNode(finalSettings, plugins); node = new MockNode(finalSettings, plugins);
node.injector().getInstance(TransportService.class).addLifecycleListener(new LifecycleListener() {
@Override
public void afterStart() {
onTransportServiceStarted.run();
}
});
markNodeDataDirsAsNotEligableForWipe(node); markNodeDataDirsAsNotEligableForWipe(node);
} }
@ -1055,11 +1068,13 @@ public final class InternalTestCluster extends TestCluster {
final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes; final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes;
final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1; final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1;
final List<NodeAndClient> toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes final List<NodeAndClient> toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes
final Runnable onTransportServiceStarted = () -> rebuildUnicastHostFiles(toStartAndPublish);
for (int i = 0; i < numSharedDedicatedMasterNodes; i++) { for (int i = 0; i < numSharedDedicatedMasterNodes; i++) {
final Settings.Builder settings = Settings.builder(); final Settings.Builder settings = Settings.builder();
settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(Node.NODE_MASTER_SETTING.getKey(), true);
settings.put(Node.NODE_DATA_SETTING.getKey(), false); settings.put(Node.NODE_DATA_SETTING.getKey(), false);
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes,
onTransportServiceStarted);
toStartAndPublish.add(nodeAndClient); toStartAndPublish.add(nodeAndClient);
} }
for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) { for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) {
@ -1069,14 +1084,16 @@ public final class InternalTestCluster extends TestCluster {
settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build(); settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build();
settings.put(Node.NODE_DATA_SETTING.getKey(), true).build(); settings.put(Node.NODE_DATA_SETTING.getKey(), true).build();
} }
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes,
onTransportServiceStarted);
toStartAndPublish.add(nodeAndClient); toStartAndPublish.add(nodeAndClient);
} }
for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes; for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes;
i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) { i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) {
final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false) final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false); .put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false);
NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes,
onTransportServiceStarted);
toStartAndPublish.add(nodeAndClient); toStartAndPublish.add(nodeAndClient);
} }
@ -1429,6 +1446,7 @@ public final class InternalTestCluster extends TestCluster {
updateMinMasterNodes(currentMasters + newMasters); updateMinMasterNodes(currentMasters + newMasters);
} }
List<Future<?>> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList()); List<Future<?>> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList());
try { try {
for (Future<?> future : futures) { for (Future<?> future : futures) {
future.get(); future.get();
@ -1449,6 +1467,30 @@ public final class InternalTestCluster extends TestCluster {
} }
} }
private final Object discoveryFileMutex = new Object();
private void rebuildUnicastHostFiles(Collection<NodeAndClient> newNodes) {
// cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients()
synchronized (discoveryFileMutex) {
try {
List<String> discoveryFileContents = Stream.concat(nodes.values().stream(), newNodes.stream())
.map(nac -> nac.node.injector().getInstance(TransportService.class)).filter(Objects::nonNull)
.map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode)
.map(n -> n.getAddress().toString())
.distinct().collect(Collectors.toList());
Set<Path> configPaths = Stream.concat(nodes.values().stream(), newNodes.stream())
.map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet());
logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths);
for (final Path configPath : configPaths) {
Files.createDirectories(configPath);
Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents);
}
} catch (IOException e) {
throw new AssertionError("failed to configure file-based discovery", e);
}
}
}
private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException { private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException {
stopNodesAndClients(Collections.singleton(nodeAndClient)); stopNodesAndClients(Collections.singleton(nodeAndClient));
} }
@ -1607,7 +1649,7 @@ public final class InternalTestCluster extends TestCluster {
for (List<NodeAndClient> sameRoleNodes : nodesByRoles.values()) { for (List<NodeAndClient> sameRoleNodes : nodesByRoles.values()) {
Collections.shuffle(sameRoleNodes, random); Collections.shuffle(sameRoleNodes, random);
} }
List<NodeAndClient> startUpOrder = new ArrayList<>(); final List<NodeAndClient> startUpOrder = new ArrayList<>();
for (Set roles : rolesOrderedByOriginalStartupOrder) { for (Set roles : rolesOrderedByOriginalStartupOrder) {
if (roles == null) { if (roles == null) {
// if some nodes were stopped, we want have a role for that ordinal // if some nodes were stopped, we want have a role for that ordinal
@ -1618,11 +1660,11 @@ public final class InternalTestCluster extends TestCluster {
} }
assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0; assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0;
// do two rounds to minimize pinging (mock zen pings pings with no delay and can create a lot of logs)
for (NodeAndClient nodeAndClient : startUpOrder) { for (NodeAndClient nodeAndClient : startUpOrder) {
logger.info("resetting node [{}] ", nodeAndClient.name); logger.info("resetting node [{}] ", nodeAndClient.name);
// we already cleared data folders, before starting nodes up // we already cleared data folders, before starting nodes up
nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1); nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1,
() -> rebuildUnicastHostFiles(startUpOrder));
} }
startAndPublishNodesAndClients(startUpOrder); startAndPublishNodesAndClients(startUpOrder);
@ -1741,9 +1783,9 @@ public final class InternalTestCluster extends TestCluster {
} else { } else {
defaultMinMasterNodes = -1; defaultMinMasterNodes = -1;
} }
List<NodeAndClient> nodes = new ArrayList<>(); final List<NodeAndClient> nodes = new ArrayList<>();
for (Settings nodeSettings: settings) { for (Settings nodeSettings : settings) {
nodes.add(buildNode(nodeSettings, defaultMinMasterNodes)); nodes.add(buildNode(nodeSettings, defaultMinMasterNodes, () -> rebuildUnicastHostFiles(nodes)));
} }
startAndPublishNodesAndClients(nodes); startAndPublishNodesAndClients(nodes);
if (autoManageMinMasterNodes) { if (autoManageMinMasterNodes) {

View File

@ -1,91 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.discovery;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import java.io.Closeable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* A {@link UnicastHostsProvider} implementation which returns results based on a static in-memory map. This allows running
* with nodes that only determine their transport address at runtime, which is the default behavior of
* {@link org.elasticsearch.test.InternalTestCluster}
*/
public final class MockUncasedHostProvider implements UnicastHostsProvider, Closeable {
static final Map<ClusterName, Set<MockUncasedHostProvider>> activeNodesPerCluster = new HashMap<>();
private final Supplier<DiscoveryNode> localNodeSupplier;
private final ClusterName clusterName;
public MockUncasedHostProvider(Supplier<DiscoveryNode> localNodeSupplier, ClusterName clusterName) {
this.localNodeSupplier = localNodeSupplier;
this.clusterName = clusterName;
synchronized (activeNodesPerCluster) {
getActiveNodesForCurrentCluster().add(this);
}
}
@Override
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
final DiscoveryNode localNode = getNode();
assert localNode != null;
synchronized (activeNodesPerCluster) {
Set<MockUncasedHostProvider> activeNodes = getActiveNodesForCurrentCluster();
return activeNodes.stream()
.map(MockUncasedHostProvider::getNode)
.filter(Objects::nonNull)
.filter(n -> localNode.equals(n) == false)
.map(DiscoveryNode::getAddress)
.collect(Collectors.toList());
}
}
@Nullable
private DiscoveryNode getNode() {
return localNodeSupplier.get();
}
private Set<MockUncasedHostProvider> getActiveNodesForCurrentCluster() {
assert Thread.holdsLock(activeNodesPerCluster);
return activeNodesPerCluster.computeIfAbsent(clusterName,
clusterName -> ConcurrentCollections.newConcurrentSet());
}
@Override
public void close() {
synchronized (activeNodesPerCluster) {
boolean found = getActiveNodesForCurrentCluster().remove(this);
assert found;
}
}
}

View File

@ -19,13 +19,10 @@
package org.elasticsearch.test.discovery; package org.elasticsearch.test.discovery;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -39,7 +36,6 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -59,7 +55,6 @@ public class TestZenDiscovery extends ZenDiscovery {
/** A plugin which installs mock discovery and configures it to be used. */ /** A plugin which installs mock discovery and configures it to be used. */
public static class TestPlugin extends Plugin implements DiscoveryPlugin { public static class TestPlugin extends Plugin implements DiscoveryPlugin {
protected final Settings settings; protected final Settings settings;
private final SetOnce<MockUncasedHostProvider> unicastHostProvider = new SetOnce<>();
public TestPlugin(Settings settings) { public TestPlugin(Settings settings) {
this.settings = settings; this.settings = settings;
} }
@ -78,26 +73,6 @@ public class TestZenDiscovery extends ZenDiscovery {
clusterApplier, clusterSettings, hostsProvider, allocationService)); clusterApplier, clusterSettings, hostsProvider, allocationService));
} }
@Override
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
final Supplier<UnicastHostsProvider> supplier;
if (USE_MOCK_PINGS.get(settings)) {
// we have to return something in order for the unicast host provider setting to resolve to something. It will never be used
supplier = () -> hostsResolver -> {
throw new UnsupportedOperationException();
};
} else {
supplier = () -> {
unicastHostProvider.set(
new MockUncasedHostProvider(transportService::getLocalNode, ClusterName.CLUSTER_NAME_SETTING.get(settings))
);
return unicastHostProvider.get();
};
}
return Collections.singletonMap("test-zen", supplier);
}
@Override @Override
public List<Setting<?>> getSettings() { public List<Setting<?>> getSettings() {
return Collections.singletonList(USE_MOCK_PINGS); return Collections.singletonList(USE_MOCK_PINGS);
@ -107,18 +82,9 @@ public class TestZenDiscovery extends ZenDiscovery {
public Settings additionalSettings() { public Settings additionalSettings() {
return Settings.builder() return Settings.builder()
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen")
.put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen")
.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey())
.build(); .build();
} }
@Override
public void close() throws IOException {
super.close();
if (unicastHostProvider.get() != null) {
unicastHostProvider.get().close();
}
}
} }
private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService, private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,

View File

@ -8,6 +8,7 @@ package org.elasticsearch.license;
import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsIndices; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsIndices;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
@ -52,8 +53,11 @@ import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -278,6 +282,10 @@ public class LicensingTests extends SecurityIntegTestCase {
enableLicensing(mode); enableLicensing(mode);
ensureGreen(); ensureGreen();
final List<String> unicastHostsList = internalCluster().masterClient().admin().cluster().nodesInfo(new NodesInfoRequest()).get()
.getNodes().stream().map(n -> n.getTransport().getAddress().publishAddress().toString()).distinct()
.collect(Collectors.toList());
Path home = createTempDir(); Path home = createTempDir();
Path conf = home.resolve("config"); Path conf = home.resolve("config");
Files.createDirectories(conf); Files.createDirectories(conf);
@ -291,7 +299,8 @@ public class LicensingTests extends SecurityIntegTestCase {
.put("path.home", home) .put("path.home", home)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false) .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen")
.put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen") .putList(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey())
.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), unicastHostsList)
.build(); .build();
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(LocalStateSecurity.class, TestZenDiscovery.TestPlugin.class, Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(LocalStateSecurity.class, TestZenDiscovery.TestPlugin.class,
MockHttpTransport.TestPlugin.class); MockHttpTransport.TestPlugin.class);