diff --git a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java index cf41ede1287..a00564b6c25 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -52,15 +51,8 @@ import static org.hamcrest.Matchers.greaterThan; @ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) public class NoMasterNodeIT extends ESIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build(); - } - public void testNoMasterActions() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("action.auto_create_index", true) .put("discovery.zen.minimum_master_nodes", 2) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") @@ -173,7 +165,6 @@ public class NoMasterNodeIT extends ESIntegTestCase { public void testNoMasterActionsWriteMasterBlock() throws Exception { Settings settings = Settings.builder() - .put("discovery.type", "zen") .put("action.auto_create_index", false) .put("discovery.zen.minimum_master_nodes", 2) .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms") diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index ed13f34b609..31cfa30a49e 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -200,11 +200,8 @@ public class ZenDiscoveryIT extends ESIntegTestCase { } public void testHandleNodeJoin_incompatibleClusterState() throws UnknownHostException { - Settings nodeSettings = Settings.builder() - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .build(); - String masterOnlyNode = internalCluster().startMasterOnlyNode(nodeSettings); - String node1 = internalCluster().startNode(nodeSettings); + String masterOnlyNode = internalCluster().startMasterOnlyNode(); + String node1 = internalCluster().startNode(); ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, masterOnlyNode); ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1); final ClusterState state = clusterService.state(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 0b001de69cf..29b0abc5158 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -162,31 +162,8 @@ public final class InternalTestCluster extends TestCluster { private final Logger logger = Loggers.getLogger(getClass()); - /** - * The number of ports in the range used for this JVM - */ - public static final int PORTS_PER_JVM = 100; - - /** - * The number of ports in the range used for this cluster - */ - public static final int PORTS_PER_CLUSTER = 20; - - private static final int GLOBAL_TRANSPORT_BASE_PORT = 9300; - private static final int GLOBAL_HTTP_BASE_PORT = 19200; - - private static final int JVM_ORDINAL = Integer.parseInt(System.getProperty(SysGlobals.CHILDVM_SYSPROP_JVM_ID, "0")); - - /** - * a per-JVM unique offset to be used for calculating unique port ranges. - */ - public static final int JVM_BASE_PORT_OFFSET = PORTS_PER_JVM * (JVM_ORDINAL + 1); private static final AtomicInteger clusterOrdinal = new AtomicInteger(); - private final int CLUSTER_BASE_PORT_OFFSET = JVM_BASE_PORT_OFFSET + (clusterOrdinal.getAndIncrement() * PORTS_PER_CLUSTER) % PORTS_PER_JVM; - - public final int TRANSPORT_BASE_PORT = GLOBAL_TRANSPORT_BASE_PORT + CLUSTER_BASE_PORT_OFFSET; - public final int HTTP_BASE_PORT = GLOBAL_HTTP_BASE_PORT + CLUSTER_BASE_PORT_OFFSET; public static final int DEFAULT_LOW_NUM_MASTER_NODES = 1; @@ -293,8 +270,12 @@ public final class InternalTestCluster extends TestCluster { this.nodePrefix = nodePrefix; assert nodePrefix != null; + ArrayList> tmpMockPlugins = new ArrayList<>(mockPlugins); + + this.mockPlugins = mockPlugins; + sharedNodesSeeds = new long[numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes]; for (int i = 0; i < sharedNodesSeeds.length; i++) { sharedNodesSeeds[i] = random.nextLong(); @@ -322,8 +303,8 @@ public final class InternalTestCluster extends TestCluster { builder.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), baseDir.resolve("custom")); builder.put(Environment.PATH_HOME_SETTING.getKey(), baseDir); builder.put(Environment.PATH_REPO_SETTING.getKey(), baseDir.resolve("repos")); - builder.put(TcpTransport.PORT.getKey(), TRANSPORT_BASE_PORT + "-" + (TRANSPORT_BASE_PORT + PORTS_PER_CLUSTER)); - builder.put("http.port", HTTP_BASE_PORT + "-" + (HTTP_BASE_PORT + PORTS_PER_CLUSTER)); + builder.put(TcpTransport.PORT.getKey(), 0); + builder.put("http.port", 0); builder.put("http.pipelining", enableHttpPipelining); if (Strings.hasLength(System.getProperty("tests.es.logger.level"))) { builder.put("logger.level", System.getProperty("tests.es.logger.level")); diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java index f873ec4fb93..a93d2b364d5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java @@ -19,6 +19,7 @@ package org.elasticsearch.test.discovery; import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.SysGlobals; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.network.NetworkModule; @@ -27,7 +28,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.mocksocket.MockServerSocket; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.transport.TcpTransport; @@ -40,6 +40,19 @@ import java.util.Set; public class ClusterDiscoveryConfiguration extends NodeConfigurationSource { + /** + * The number of ports in the range used for this JVM + */ + private static final int PORTS_PER_JVM = 100; + + private static final int JVM_ORDINAL = Integer.parseInt(System.getProperty(SysGlobals.CHILDVM_SYSPROP_JVM_ID, "0")); + + /** + * a per-JVM unique offset to be used for calculating unique port ranges. + */ + private static final int JVM_BASE_PORT_OFFSET = PORTS_PER_JVM * (JVM_ORDINAL + 1); + + static Settings DEFAULT_NODE_SETTINGS = Settings.EMPTY; private static final String IP_ADDR = "127.0.0.1"; @@ -110,7 +123,7 @@ public class ClusterDiscoveryConfiguration extends NodeConfigurationSource { } private static int calcBasePort() { - return 30000 + InternalTestCluster.JVM_BASE_PORT_OFFSET; + return 30000 + JVM_BASE_PORT_OFFSET; } @Override @@ -138,11 +151,11 @@ public class ClusterDiscoveryConfiguration extends NodeConfigurationSource { int[] unicastHostPorts = new int[numHosts]; final int basePort = calcBasePort(); - final int maxPort = basePort + InternalTestCluster.PORTS_PER_JVM; + final int maxPort = basePort + PORTS_PER_JVM; int tries = 0; for (int i = 0; i < unicastHostPorts.length; i++) { boolean foundPortInRange = false; - while (tries < InternalTestCluster.PORTS_PER_JVM && !foundPortInRange) { + while (tries < PORTS_PER_JVM && !foundPortInRange) { try (ServerSocket serverSocket = new MockServerSocket()) { // Set SO_REUSEADDR as we may bind here and not be able to reuse the address immediately without it. serverSocket.setReuseAddress(NetworkUtils.defaultReuseAddress()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java new file mode 100644 index 00000000000..c8e4e2454b3 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockUncasedHostProvider.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.discovery; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; + +import java.io.Closeable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * A {@link UnicastHostsProvider} implementation which returns results based on a static in-memory map. This allows running + * with nodes that only determine their transport address at runtime, which is the default behavior of + * {@link org.elasticsearch.test.InternalTestCluster} + */ +public final class MockUncasedHostProvider implements UnicastHostsProvider, Closeable { + + static final Map> activeNodesPerCluster = new HashMap<>(); + + + private final Supplier localNodeSupplier; + private final ClusterName clusterName; + + public MockUncasedHostProvider(Supplier localNodeSupplier, ClusterName clusterName) { + this.localNodeSupplier = localNodeSupplier; + this.clusterName = clusterName; + synchronized (activeNodesPerCluster) { + getActiveNodesForCurrentCluster().add(this); + } + } + + @Override + public List buildDynamicNodes() { + synchronized (activeNodesPerCluster) { + Set activeNodes = getActiveNodesForCurrentCluster(); + return activeNodes.stream() + .map(MockUncasedHostProvider::getNode) + .filter(n -> !localNodeSupplier.get().equals(n)) + .collect(Collectors.toList()); + } + } + + private DiscoveryNode getNode() { + return localNodeSupplier.get(); + } + + private Set getActiveNodesForCurrentCluster() { + assert Thread.holdsLock(activeNodesPerCluster); + return activeNodesPerCluster.computeIfAbsent(clusterName, + clusterName -> ConcurrentCollections.newConcurrentSet()); + } + + @Override + public void close() { + synchronized (activeNodesPerCluster) { + boolean found = getActiveNodesForCurrentCluster().remove(this); + assert found; + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java index 4f2a3c5b1c9..ae62acff9b7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -33,7 +33,7 @@ import java.util.Set; import java.util.function.Consumer; /** - * A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging + * A {@link ZenPing} implementation which returns results based on a static in-memory map. This allows pinging * to be immediate and can be used to speed up tests. */ public final class MockZenPing extends AbstractComponent implements ZenPing { diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index d224d9c519c..11f9e38e665 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -19,15 +19,13 @@ package org.elasticsearch.test.discovery; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -41,6 +39,14 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; + /** * A alternative zen discovery which allows using mocks for things like pings, as well as * giving access to internals. @@ -53,9 +59,11 @@ public class TestZenDiscovery extends ZenDiscovery { /** A plugin which installs mock discovery and configures it to be used. */ public static class TestPlugin extends Plugin implements DiscoveryPlugin { protected final Settings settings; + private final SetOnce unicastHostProvider = new SetOnce<>(); public TestPlugin(Settings settings) { this.settings = settings; } + @Override public Map> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, @@ -63,10 +71,33 @@ public class TestZenDiscovery extends ZenDiscovery { ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService) { return Collections.singletonMap("test-zen", - () -> new TestZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, + () -> new TestZenDiscovery( + // we don't get the latest setting which were updated by the extra settings for the plugin. TODO: fix. + Settings.builder().put(settings).putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()).build(), + threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, hostsProvider, allocationService)); } + @Override + public Map> getZenHostsProviders(TransportService transportService, + NetworkService networkService) { + final Supplier supplier; + if (USE_MOCK_PINGS.get(settings)) { + // we have to return something in order for the unicast host provider setting to resolve to something. It will never be used + supplier = () -> () -> { + throw new UnsupportedOperationException(); + }; + } else { + supplier = () -> { + unicastHostProvider.set( + new MockUncasedHostProvider(transportService::getLocalNode, ClusterName.CLUSTER_NAME_SETTING.get(settings)) + ); + return unicastHostProvider.get(); + }; + } + return Collections.singletonMap("test-zen", supplier); + } + @Override public List> getSettings() { return Collections.singletonList(USE_MOCK_PINGS); @@ -74,7 +105,19 @@ public class TestZenDiscovery extends ZenDiscovery { @Override public Settings additionalSettings() { - return Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen").build(); + return Settings.builder() + .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen") + .put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen") + .putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()) + .build(); + } + + @Override + public void close() throws IOException { + super.close(); + if (unicastHostProvider.get() != null) { + unicastHostProvider.get().close(); + } } }