diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index f34798605d7..91f4e615159 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -131,7 +131,7 @@ public class DiscoveryModule { if (discoverySupplier == null) { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); } - Loggers.getLogger(getClass(), settings).info("using discovery type [{}]", discoveryType); + Loggers.getLogger(getClass(), settings).info("using discovery type [{}] and host providers {}", discoveryType, hostsProviderNames); discovery = Objects.requireNonNull(discoverySupplier.get()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index b17a0cc5418..1a0e964ef77 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -53,7 +53,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptySet; @@ -113,6 +112,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) // manual collection or upon cluster forming. .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2) .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.getKey(), "1s") @@ -121,8 +121,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(TestPlugin.class, - MockTransportService.TestPlugin.class); + return Arrays.asList(TestPlugin.class, MockTransportService.TestPlugin.class); } public void testClusterInfoServiceCollectsInformation() throws Exception { @@ -172,7 +171,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { } } - public void testClusterInfoServiceInformationClearOnError() throws InterruptedException, ExecutionException { + public void testClusterInfoServiceInformationClearOnError() { internalCluster().startNodes(2, // manually control publishing Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build()); diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java new file mode 100644 index 00000000000..429950bf853 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/discovery/zen/SettingsBasedHostProviderIT.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen; + +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.LIMIT_LOCAL_PORTS_COUNT; +import static org.elasticsearch.transport.TcpTransport.PORT; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class SettingsBasedHostProviderIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + + // super.nodeSettings enables file-based discovery, but here we disable it again so we can test the static list: + if (randomBoolean()) { + builder.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()); + } else { + builder.remove(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()); + } + + // super.nodeSettings sets this to an empty list, which disables any search for other nodes, but here we want this to happen: + builder.remove(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()); + + return builder.build(); + } + + public void testClusterFormsWithSingleSeedHostInSettings() { + final String seedNodeName = internalCluster().startNode(); + final NodesInfoResponse nodesInfoResponse + = client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet(); + final String seedNodeAddress = nodesInfoResponse.getNodes().get(0).getTransport().getAddress().publishAddress().toString(); + logger.info("--> using seed node address {}", seedNodeAddress); + + int extraNodes = randomIntBetween(1, 5); + internalCluster().startNodes(extraNodes, + Settings.builder().putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), seedNodeAddress).build()); + + ensureStableCluster(extraNodes + 1); + } + + public void testClusterFormsByScanningPorts() { + // This test will fail if all 4 ports just less than the one used by the first node are already bound by something else. It's hard + // to know how often this might happen in reality, so let's try it and see. + + final String seedNodeName = internalCluster().startNode(); + final NodesInfoResponse nodesInfoResponse + = client(seedNodeName).admin().cluster().nodesInfo(new NodesInfoRequest("_local")).actionGet(); + final int seedNodePort = nodesInfoResponse.getNodes().get(0).getTransport().getAddress().publishAddress().getPort(); + final int minPort = randomIntBetween(seedNodePort - LIMIT_LOCAL_PORTS_COUNT + 1, seedNodePort - 1); + final String portSpec = minPort + "-" + seedNodePort; + + logger.info("--> using port specification [{}]", portSpec); + internalCluster().startNode(Settings.builder().put(PORT.getKey(), portSpec)); + ensureStableCluster(2); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java b/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java index 0294f9f67f8..2e28d16c71d 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java @@ -69,7 +69,10 @@ public class SearchCancellationIT extends ESIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { boolean lowLevelCancellation = randomBoolean(); logger.info("Using lowLevelCancellation: {}", lowLevelCancellation); - return Settings.builder().put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation).build(); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation) + .build(); } private void indexTestData() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 52ed2205ab5..52f234c9690 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -206,6 +206,8 @@ import static org.elasticsearch.client.Requests.syncedFlushRequest; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.common.util.CollectionUtils.eagerPartition; +import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; @@ -1808,7 +1810,9 @@ public abstract class ESIntegTestCase extends ESTestCase { // wait short time for other active shards before actually deleting, default 30s not needed in tests .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS)) // randomly enable low-level search cancellation to make sure it does not alter results - .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()); + .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()) + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes + .putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file"); if (rarely()) { // Sometimes adjust the minimum search thread pool size, causing // QueueResizingEsThreadPoolExecutor to be used instead of a regular @@ -1921,7 +1925,7 @@ public abstract class ESIntegTestCase extends ESTestCase { networkSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType()); } - NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { + return new NodeConfigurationSource() { @Override public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -1955,7 +1959,6 @@ public abstract class ESIntegTestCase extends ESTestCase { return Collections.unmodifiableCollection(plugins); } }; - return nodeConfigurationSource; } /** @@ -2029,7 +2032,7 @@ public abstract class ESIntegTestCase extends ESTestCase { public static final class TestSeedPlugin extends Plugin { @Override public List> getSettings() { - return Arrays.asList(INDEX_TEST_SEED_SETTING); + return Collections.singletonList(INDEX_TEST_SEED_SETTING); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index d73520f91b3..bcaa4e8303f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -62,6 +62,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -197,6 +198,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { // turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we // turn it off for these tests. .put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false) + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes .put(nodeSettings()) // allow test cases to provide their own settings or override these .build(); Collection> plugins = getPlugins(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 08aafaea399..354cb807bb2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -47,6 +47,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; @@ -103,6 +104,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.net.InetSocketAddress; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -114,6 +116,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.TreeMap; @@ -128,10 +131,12 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyList; import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.rarely; import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; +import static org.elasticsearch.discovery.zen.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.getTestTransportType; @@ -486,11 +491,13 @@ public final class InternalTestCluster extends TestCluster { private synchronized NodeAndClient getOrBuildRandomNode() { ensureOpen(); - NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); + final NodeAndClient randomNodeAndClient = getRandomNodeAndClient(); if (randomNodeAndClient != null) { return randomNodeAndClient; } - NodeAndClient buildNode = buildNode(1); + final int ord = nextNodeId.getAndIncrement(); + final Runnable onTransportServiceStarted = () -> {}; // do not create unicast host file for this one node. + final NodeAndClient buildNode = buildNode(ord, random.nextLong(), null, false, 1, onTransportServiceStarted); buildNode.startNode(); publishNode(buildNode); return buildNode; @@ -562,20 +569,11 @@ public final class InternalTestCluster extends TestCluster { * * @param settings the settings to use * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed + * @param onTransportServiceStarted callback to run when transport service is started */ - private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes) { + private NodeAndClient buildNode(Settings settings, int defaultMinMasterNodes, Runnable onTransportServiceStarted) { int ord = nextNodeId.getAndIncrement(); - return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes); - } - - /** - * builds a new node with default settings - * - * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed - */ - private NodeAndClient buildNode(int defaultMinMasterNodes) { - int ord = nextNodeId.getAndIncrement(); - return buildNode(ord, random.nextLong(), null, false, defaultMinMasterNodes); + return buildNode(ord, random.nextLong(), settings, false, defaultMinMasterNodes, onTransportServiceStarted); } /** @@ -587,15 +585,17 @@ public final class InternalTestCluster extends TestCluster { * @param reuseExisting if a node with the same name is already part of {@link #nodes}, no new node will be built and * the method will return the existing one * @param defaultMinMasterNodes min_master_nodes value to use if min_master_nodes is auto managed + * @param onTransportServiceStarted callback to run when transport service is started */ private NodeAndClient buildNode(int nodeId, long seed, Settings settings, - boolean reuseExisting, int defaultMinMasterNodes) { + boolean reuseExisting, int defaultMinMasterNodes, Runnable onTransportServiceStarted) { assert Thread.holdsLock(this); ensureOpen(); settings = getSettings(nodeId, seed, settings); Collection> plugins = getPlugins(); String name = buildNodeName(nodeId, settings); if (reuseExisting && nodes.containsKey(name)) { + onTransportServiceStarted.run(); // reusing an existing node implies its transport service already started return nodes.get(name); } else { assert reuseExisting == true || nodes.containsKey(name) == false : @@ -631,6 +631,12 @@ public final class InternalTestCluster extends TestCluster { plugins, nodeConfigurationSource.nodeConfigPath(nodeId), forbidPrivateIndexSettings); + node.injector().getInstance(TransportService.class).addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + onTransportServiceStarted.run(); + } + }); try { IOUtils.close(secureSettings); } catch (IOException e) { @@ -907,14 +913,15 @@ public final class InternalTestCluster extends TestCluster { if (!node.isClosed()) { closeNode(); } - recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes); + recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes, () -> rebuildUnicastHostFiles(emptyList())); startNode(); } /** * rebuilds a new node object using the current node settings and starts it */ - void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception { + void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes, + Runnable onTransportServiceStarted) throws Exception { assert callback != null; Settings callbackSettings = callback.onNodeStopped(name); Settings.Builder newSettings = Settings.builder(); @@ -928,7 +935,7 @@ public final class InternalTestCluster extends TestCluster { if (clearDataIfNeeded) { clearDataIfNeeded(callback); } - createNewNode(newSettings.build()); + createNewNode(newSettings.build(), onTransportServiceStarted); // make sure cached client points to new node resetClient(); } @@ -944,7 +951,7 @@ public final class InternalTestCluster extends TestCluster { } } - private void createNewNode(final Settings newSettings) { + private void createNewNode(final Settings newSettings, final Runnable onTransportServiceStarted) { final long newIdSeed = NodeEnvironment.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id Settings finalSettings = Settings.builder().put(node.originalSettings()).put(newSettings).put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build(); if (DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(finalSettings) == false) { @@ -953,6 +960,12 @@ public final class InternalTestCluster extends TestCluster { } Collection> plugins = node.getClasspathPlugins(); node = new MockNode(finalSettings, plugins); + node.injector().getInstance(TransportService.class).addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + onTransportServiceStarted.run(); + } + }); markNodeDataDirsAsNotEligableForWipe(node); } @@ -1055,11 +1068,13 @@ public final class InternalTestCluster extends TestCluster { final int numberOfMasterNodes = numSharedDedicatedMasterNodes > 0 ? numSharedDedicatedMasterNodes : numSharedDataNodes; final int defaultMinMasterNodes = (numberOfMasterNodes / 2) + 1; final List toStartAndPublish = new ArrayList<>(); // we want to start nodes in one go due to min master nodes + final Runnable onTransportServiceStarted = () -> rebuildUnicastHostFiles(toStartAndPublish); for (int i = 0; i < numSharedDedicatedMasterNodes; i++) { final Settings.Builder settings = Settings.builder(); settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(Node.NODE_DATA_SETTING.getKey(), false); - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, + onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes; i++) { @@ -1069,14 +1084,16 @@ public final class InternalTestCluster extends TestCluster { settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build(); settings.put(Node.NODE_DATA_SETTING.getKey(), true).build(); } - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, + onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); } for (int i = numSharedDedicatedMasterNodes + numSharedDataNodes; i < numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes; i++) { final Builder settings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false); - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); + NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes, + onTransportServiceStarted); toStartAndPublish.add(nodeAndClient); } @@ -1429,6 +1446,7 @@ public final class InternalTestCluster extends TestCluster { updateMinMasterNodes(currentMasters + newMasters); } List> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList()); + try { for (Future future : futures) { future.get(); @@ -1449,6 +1467,30 @@ public final class InternalTestCluster extends TestCluster { } } + private final Object discoveryFileMutex = new Object(); + + private void rebuildUnicastHostFiles(Collection newNodes) { + // cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients() + synchronized (discoveryFileMutex) { + try { + List discoveryFileContents = Stream.concat(nodes.values().stream(), newNodes.stream()) + .map(nac -> nac.node.injector().getInstance(TransportService.class)).filter(Objects::nonNull) + .map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode) + .map(n -> n.getAddress().toString()) + .distinct().collect(Collectors.toList()); + Set configPaths = Stream.concat(nodes.values().stream(), newNodes.stream()) + .map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet()); + logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths); + for (final Path configPath : configPaths) { + Files.createDirectories(configPath); + Files.write(configPath.resolve(UNICAST_HOSTS_FILE), discoveryFileContents); + } + } catch (IOException e) { + throw new AssertionError("failed to configure file-based discovery", e); + } + } + } + private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException { stopNodesAndClients(Collections.singleton(nodeAndClient)); } @@ -1607,7 +1649,7 @@ public final class InternalTestCluster extends TestCluster { for (List sameRoleNodes : nodesByRoles.values()) { Collections.shuffle(sameRoleNodes, random); } - List startUpOrder = new ArrayList<>(); + final List startUpOrder = new ArrayList<>(); for (Set roles : rolesOrderedByOriginalStartupOrder) { if (roles == null) { // if some nodes were stopped, we want have a role for that ordinal @@ -1618,11 +1660,11 @@ public final class InternalTestCluster extends TestCluster { } assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0; - // do two rounds to minimize pinging (mock zen pings pings with no delay and can create a lot of logs) for (NodeAndClient nodeAndClient : startUpOrder) { logger.info("resetting node [{}] ", nodeAndClient.name); // we already cleared data folders, before starting nodes up - nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1); + nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1, + () -> rebuildUnicastHostFiles(startUpOrder)); } startAndPublishNodesAndClients(startUpOrder); @@ -1741,9 +1783,9 @@ public final class InternalTestCluster extends TestCluster { } else { defaultMinMasterNodes = -1; } - List nodes = new ArrayList<>(); - for (Settings nodeSettings: settings) { - nodes.add(buildNode(nodeSettings, defaultMinMasterNodes)); + final List nodes = new ArrayList<>(); + for (Settings nodeSettings : settings) { + nodes.add(buildNode(nodeSettings, defaultMinMasterNodes, () -> rebuildUnicastHostFiles(nodes))); } startAndPublishNodesAndClients(nodes); if (autoManageMinMasterNodes) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java deleted file mode 100644 index dc9304637cd..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test.discovery; - -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.zen.UnicastHostsProvider; - -import java.io.Closeable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -/** - * A {@link UnicastHostsProvider} implementation which returns results based on a static in-memory map. This allows running - * with nodes that only determine their transport address at runtime, which is the default behavior of - * {@link org.elasticsearch.test.InternalTestCluster} - */ -public final class MockUncasedHostProvider implements UnicastHostsProvider, Closeable { - - static final Map> activeNodesPerCluster = new HashMap<>(); - - - private final Supplier localNodeSupplier; - private final ClusterName clusterName; - - public MockUncasedHostProvider(Supplier localNodeSupplier, ClusterName clusterName) { - this.localNodeSupplier = localNodeSupplier; - this.clusterName = clusterName; - synchronized (activeNodesPerCluster) { - getActiveNodesForCurrentCluster().add(this); - } - } - - @Override - public List buildDynamicHosts(HostsResolver hostsResolver) { - final DiscoveryNode localNode = getNode(); - assert localNode != null; - synchronized (activeNodesPerCluster) { - Set activeNodes = getActiveNodesForCurrentCluster(); - return activeNodes.stream() - .map(MockUncasedHostProvider::getNode) - .filter(Objects::nonNull) - .filter(n -> localNode.equals(n) == false) - .map(DiscoveryNode::getAddress) - .collect(Collectors.toList()); - } - } - - @Nullable - private DiscoveryNode getNode() { - return localNodeSupplier.get(); - } - - private Set getActiveNodesForCurrentCluster() { - assert Thread.holdsLock(activeNodesPerCluster); - return activeNodesPerCluster.computeIfAbsent(clusterName, - clusterName -> ConcurrentCollections.newConcurrentSet()); - } - - @Override - public void close() { - synchronized (activeNodesPerCluster) { - boolean found = getActiveNodesForCurrentCluster().remove(this); - assert found; - } - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 5387a659aa2..2c8305b4e12 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -19,13 +19,10 @@ package org.elasticsearch.test.discovery; -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -39,7 +36,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -59,7 +55,6 @@ public class TestZenDiscovery extends ZenDiscovery { /** A plugin which installs mock discovery and configures it to be used. */ public static class TestPlugin extends Plugin implements DiscoveryPlugin { protected final Settings settings; - private final SetOnce unicastHostProvider = new SetOnce<>(); public TestPlugin(Settings settings) { this.settings = settings; } @@ -78,26 +73,6 @@ public class TestZenDiscovery extends ZenDiscovery { clusterApplier, clusterSettings, hostsProvider, allocationService)); } - @Override - public Map> getZenHostsProviders(TransportService transportService, - NetworkService networkService) { - final Supplier supplier; - if (USE_MOCK_PINGS.get(settings)) { - // we have to return something in order for the unicast host provider setting to resolve to something. It will never be used - supplier = () -> hostsResolver -> { - throw new UnsupportedOperationException(); - }; - } else { - supplier = () -> { - unicastHostProvider.set( - new MockUncasedHostProvider(transportService::getLocalNode, ClusterName.CLUSTER_NAME_SETTING.get(settings)) - ); - return unicastHostProvider.get(); - }; - } - return Collections.singletonMap("test-zen", supplier); - } - @Override public List> getSettings() { return Collections.singletonList(USE_MOCK_PINGS); @@ -107,18 +82,9 @@ public class TestZenDiscovery extends ZenDiscovery { public Settings additionalSettings() { return Settings.builder() .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") - .put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen") .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) .build(); } - - @Override - public void close() throws IOException { - super.close(); - if (unicastHostProvider.get() != null) { - unicastHostProvider.get().close(); - } - } } private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java index a63dd94b639..ad1bb7be95c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.license; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsIndices; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; @@ -52,8 +53,11 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -278,6 +282,10 @@ public class LicensingTests extends SecurityIntegTestCase { enableLicensing(mode); ensureGreen(); + final List unicastHostsList = internalCluster().masterClient().admin().cluster().nodesInfo(new NodesInfoRequest()).get() + .getNodes().stream().map(n -> n.getTransport().getAddress().publishAddress().toString()).distinct() + .collect(Collectors.toList()); + Path home = createTempDir(); Path conf = home.resolve("config"); Files.createDirectories(conf); @@ -291,7 +299,8 @@ public class LicensingTests extends SecurityIntegTestCase { .put("path.home", home) .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false) .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") - .put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen") + .putList(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()) + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), unicastHostsList) .build(); Collection> mockPlugins = Arrays.asList(LocalStateSecurity.class, TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class);