Use port 0 InternalTestCluster nodes (#27859)

We currently have a complicated port assignment scheme to make sure that the nodes span off by the internal test cluster will be assigned fixed port ranges that will also not collide between clusters. The port ranges need to be fixed in advance so that the nodes will be able to find each other via `UnicastZenPing`.

This approach worked well for the last few years but we are now at a point that our testing has grown beyond it and we exceed the 5 reusable ranges per JVM. This means that nodes are not always assigned the first 5 ports in their range which causes cluster formation issues. On top of that, most of the clusters that are span up don't even rely on `UnicastZenPing` but rather `MockZenPings` that uses in memory maps for discovery (with the down side that they are not influenced by network disruption simulations).

This PR changes `InternalTestCluster` to use port 0 as a fixed assignment. This will allow the OS to manage ports and will ensure we don't have collisions. For tests that need to simulate network disruptions (and thus can't use `MockZenPings`), a new `UnicastHostProvider` is introduced that is based on the current state of the test cluster. Since that is only resolved at run time, it is aware of the port assignments of the OS.

Closes #27818
Closes #27762
This commit is contained in:
Boaz Leskes 2017-12-19 08:43:03 +01:00 committed by GitHub
parent aebdb2a646
commit bea9471b2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 159 additions and 51 deletions

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.discovery.zen.ZenDiscovery;
@ -52,15 +51,8 @@ import static org.hamcrest.Matchers.greaterThan;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false)
public class NoMasterNodeIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "zen").build();
}
public void testNoMasterActions() throws Exception {
Settings settings = Settings.builder()
.put("discovery.type", "zen")
.put("action.auto_create_index", true)
.put("discovery.zen.minimum_master_nodes", 2)
.put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms")
@ -173,7 +165,6 @@ public class NoMasterNodeIT extends ESIntegTestCase {
public void testNoMasterActionsWriteMasterBlock() throws Exception {
Settings settings = Settings.builder()
.put("discovery.type", "zen")
.put("action.auto_create_index", false)
.put("discovery.zen.minimum_master_nodes", 2)
.put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "200ms")

View File

@ -200,11 +200,8 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
}
public void testHandleNodeJoin_incompatibleClusterState() throws UnknownHostException {
Settings nodeSettings = Settings.builder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.build();
String masterOnlyNode = internalCluster().startMasterOnlyNode(nodeSettings);
String node1 = internalCluster().startNode(nodeSettings);
String masterOnlyNode = internalCluster().startMasterOnlyNode();
String node1 = internalCluster().startNode();
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, masterOnlyNode);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1);
final ClusterState state = clusterService.state();

View File

@ -162,31 +162,8 @@ public final class InternalTestCluster extends TestCluster {
private final Logger logger = Loggers.getLogger(getClass());
/**
* The number of ports in the range used for this JVM
*/
public static final int PORTS_PER_JVM = 100;
/**
* The number of ports in the range used for this cluster
*/
public static final int PORTS_PER_CLUSTER = 20;
private static final int GLOBAL_TRANSPORT_BASE_PORT = 9300;
private static final int GLOBAL_HTTP_BASE_PORT = 19200;
private static final int JVM_ORDINAL = Integer.parseInt(System.getProperty(SysGlobals.CHILDVM_SYSPROP_JVM_ID, "0"));
/**
* a per-JVM unique offset to be used for calculating unique port ranges.
*/
public static final int JVM_BASE_PORT_OFFSET = PORTS_PER_JVM * (JVM_ORDINAL + 1);
private static final AtomicInteger clusterOrdinal = new AtomicInteger();
private final int CLUSTER_BASE_PORT_OFFSET = JVM_BASE_PORT_OFFSET + (clusterOrdinal.getAndIncrement() * PORTS_PER_CLUSTER) % PORTS_PER_JVM;
public final int TRANSPORT_BASE_PORT = GLOBAL_TRANSPORT_BASE_PORT + CLUSTER_BASE_PORT_OFFSET;
public final int HTTP_BASE_PORT = GLOBAL_HTTP_BASE_PORT + CLUSTER_BASE_PORT_OFFSET;
public static final int DEFAULT_LOW_NUM_MASTER_NODES = 1;
@ -293,8 +270,12 @@ public final class InternalTestCluster extends TestCluster {
this.nodePrefix = nodePrefix;
assert nodePrefix != null;
ArrayList<Class<? extends Plugin>> tmpMockPlugins = new ArrayList<>(mockPlugins);
this.mockPlugins = mockPlugins;
sharedNodesSeeds = new long[numSharedDedicatedMasterNodes + numSharedDataNodes + numSharedCoordOnlyNodes];
for (int i = 0; i < sharedNodesSeeds.length; i++) {
sharedNodesSeeds[i] = random.nextLong();
@ -322,8 +303,8 @@ public final class InternalTestCluster extends TestCluster {
builder.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), baseDir.resolve("custom"));
builder.put(Environment.PATH_HOME_SETTING.getKey(), baseDir);
builder.put(Environment.PATH_REPO_SETTING.getKey(), baseDir.resolve("repos"));
builder.put(TcpTransport.PORT.getKey(), TRANSPORT_BASE_PORT + "-" + (TRANSPORT_BASE_PORT + PORTS_PER_CLUSTER));
builder.put("http.port", HTTP_BASE_PORT + "-" + (HTTP_BASE_PORT + PORTS_PER_CLUSTER));
builder.put(TcpTransport.PORT.getKey(), 0);
builder.put("http.port", 0);
builder.put("http.pipelining", enableHttpPipelining);
if (Strings.hasLength(System.getProperty("tests.es.logger.level"))) {
builder.put("logger.level", System.getProperty("tests.es.logger.level"));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test.discovery;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.SysGlobals;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.network.NetworkModule;
@ -27,7 +28,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.transport.TcpTransport;
@ -40,6 +40,19 @@ import java.util.Set;
public class ClusterDiscoveryConfiguration extends NodeConfigurationSource {
/**
* The number of ports in the range used for this JVM
*/
private static final int PORTS_PER_JVM = 100;
private static final int JVM_ORDINAL = Integer.parseInt(System.getProperty(SysGlobals.CHILDVM_SYSPROP_JVM_ID, "0"));
/**
* a per-JVM unique offset to be used for calculating unique port ranges.
*/
private static final int JVM_BASE_PORT_OFFSET = PORTS_PER_JVM * (JVM_ORDINAL + 1);
static Settings DEFAULT_NODE_SETTINGS = Settings.EMPTY;
private static final String IP_ADDR = "127.0.0.1";
@ -110,7 +123,7 @@ public class ClusterDiscoveryConfiguration extends NodeConfigurationSource {
}
private static int calcBasePort() {
return 30000 + InternalTestCluster.JVM_BASE_PORT_OFFSET;
return 30000 + JVM_BASE_PORT_OFFSET;
}
@Override
@ -138,11 +151,11 @@ public class ClusterDiscoveryConfiguration extends NodeConfigurationSource {
int[] unicastHostPorts = new int[numHosts];
final int basePort = calcBasePort();
final int maxPort = basePort + InternalTestCluster.PORTS_PER_JVM;
final int maxPort = basePort + PORTS_PER_JVM;
int tries = 0;
for (int i = 0; i < unicastHostPorts.length; i++) {
boolean foundPortInRange = false;
while (tries < InternalTestCluster.PORTS_PER_JVM && !foundPortInRange) {
while (tries < PORTS_PER_JVM && !foundPortInRange) {
try (ServerSocket serverSocket = new MockServerSocket()) {
// Set SO_REUSEADDR as we may bind here and not be able to reuse the address immediately without it.
serverSocket.setReuseAddress(NetworkUtils.defaultReuseAddress());

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.discovery;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import java.io.Closeable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* A {@link UnicastHostsProvider} implementation which returns results based on a static in-memory map. This allows running
* with nodes that only determine their transport address at runtime, which is the default behavior of
* {@link org.elasticsearch.test.InternalTestCluster}
*/
public final class MockUncasedHostProvider implements UnicastHostsProvider, Closeable {
static final Map<ClusterName, Set<MockUncasedHostProvider>> activeNodesPerCluster = new HashMap<>();
private final Supplier<DiscoveryNode> localNodeSupplier;
private final ClusterName clusterName;
public MockUncasedHostProvider(Supplier<DiscoveryNode> localNodeSupplier, ClusterName clusterName) {
this.localNodeSupplier = localNodeSupplier;
this.clusterName = clusterName;
synchronized (activeNodesPerCluster) {
getActiveNodesForCurrentCluster().add(this);
}
}
@Override
public List<DiscoveryNode> buildDynamicNodes() {
synchronized (activeNodesPerCluster) {
Set<MockUncasedHostProvider> activeNodes = getActiveNodesForCurrentCluster();
return activeNodes.stream()
.map(MockUncasedHostProvider::getNode)
.filter(n -> !localNodeSupplier.get().equals(n))
.collect(Collectors.toList());
}
}
private DiscoveryNode getNode() {
return localNodeSupplier.get();
}
private Set<MockUncasedHostProvider> getActiveNodesForCurrentCluster() {
assert Thread.holdsLock(activeNodesPerCluster);
return activeNodesPerCluster.computeIfAbsent(clusterName,
clusterName -> ConcurrentCollections.newConcurrentSet());
}
@Override
public void close() {
synchronized (activeNodesPerCluster) {
boolean found = getActiveNodesForCurrentCluster().remove(this);
assert found;
}
}
}

View File

@ -33,7 +33,7 @@ import java.util.Set;
import java.util.function.Consumer;
/**
* A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging
* A {@link ZenPing} implementation which returns results based on a static in-memory map. This allows pinging
* to be immediate and can be used to speed up tests.
*/
public final class MockZenPing extends AbstractComponent implements ZenPing {

View File

@ -19,15 +19,13 @@
package org.elasticsearch.test.discovery;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -41,6 +39,14 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
/**
* A alternative zen discovery which allows using mocks for things like pings, as well as
* giving access to internals.
@ -53,9 +59,11 @@ public class TestZenDiscovery extends ZenDiscovery {
/** A plugin which installs mock discovery and configures it to be used. */
public static class TestPlugin extends Plugin implements DiscoveryPlugin {
protected final Settings settings;
private final SetOnce<MockUncasedHostProvider> unicastHostProvider = new SetOnce<>();
public TestPlugin(Settings settings) {
this.settings = settings;
}
@Override
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
@ -63,10 +71,33 @@ public class TestZenDiscovery extends ZenDiscovery {
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
AllocationService allocationService) {
return Collections.singletonMap("test-zen",
() -> new TestZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService,
() -> new TestZenDiscovery(
// we don't get the latest setting which were updated by the extra settings for the plugin. TODO: fix.
Settings.builder().put(settings).putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()).build(),
threadPool, transportService, namedWriteableRegistry, masterService,
clusterApplier, clusterSettings, hostsProvider, allocationService));
}
@Override
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
final Supplier<UnicastHostsProvider> supplier;
if (USE_MOCK_PINGS.get(settings)) {
// we have to return something in order for the unicast host provider setting to resolve to something. It will never be used
supplier = () -> () -> {
throw new UnsupportedOperationException();
};
} else {
supplier = () -> {
unicastHostProvider.set(
new MockUncasedHostProvider(transportService::getLocalNode, ClusterName.CLUSTER_NAME_SETTING.get(settings))
);
return unicastHostProvider.get();
};
}
return Collections.singletonMap("test-zen", supplier);
}
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(USE_MOCK_PINGS);
@ -74,7 +105,19 @@ public class TestZenDiscovery extends ZenDiscovery {
@Override
public Settings additionalSettings() {
return Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen").build();
return Settings.builder()
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen")
.put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "test-zen")
.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey())
.build();
}
@Override
public void close() throws IOException {
super.close();
if (unicastHostProvider.get() != null) {
unicastHostProvider.get().close();
}
}
}