diff --git a/src/test/java/org/elasticsearch/discovery/ClusterDiscoveryConfiguration.java b/src/test/java/org/elasticsearch/discovery/ClusterDiscoveryConfiguration.java new file mode 100644 index 00000000000..00f1a8421d6 --- /dev/null +++ b/src/test/java/org/elasticsearch/discovery/ClusterDiscoveryConfiguration.java @@ -0,0 +1,136 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.discovery; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.google.common.primitives.Ints; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.SettingsSource; +import org.elasticsearch.transport.local.LocalTransport; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +public class ClusterDiscoveryConfiguration extends SettingsSource { + + public static Settings DEFAULT_SETTINGS = ImmutableSettings.settingsBuilder() + .put("gateway.type", "local") + .put("discovery.type", "zen") + .build(); + + final int numOfNodes; + + final Settings baseSettings; + + public ClusterDiscoveryConfiguration(int numOfNodes) { + this(numOfNodes, ImmutableSettings.EMPTY); + } + + public ClusterDiscoveryConfiguration(int numOfNodes, Settings extraSettings) { + this.numOfNodes = numOfNodes; + this.baseSettings = ImmutableSettings.builder().put(DEFAULT_SETTINGS).put(extraSettings).build(); + } + + @Override + public Settings node(int nodeOrdinal) { + return baseSettings; + } + + @Override + public Settings transportClient() { + return baseSettings; + } + + public static class UnicastZen extends ClusterDiscoveryConfiguration { + + private final static AtomicInteger portRangeCounter = new AtomicInteger(); + + private final int[] unicastHostOrdinals; + private final int basePort; + + public UnicastZen(int numOfNodes) { + this(numOfNodes, numOfNodes); + } + + public UnicastZen(int numOfNodes, int numOfUnicastHosts) { + this(numOfNodes, numOfUnicastHosts, ImmutableSettings.EMPTY); + } + + public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings) { + super(numOfNodes, extraSettings); + if (numOfUnicastHosts == numOfNodes) { + unicastHostOrdinals = new int[numOfNodes]; + for (int i = 0; i < numOfNodes; i++) { + unicastHostOrdinals[i] = i; + } + } else { + Set ordinals = new HashSet<>(numOfUnicastHosts); + while (ordinals.size() != numOfUnicastHosts) { + ordinals.add(RandomizedTest.randomInt(numOfNodes - 1)); + } + unicastHostOrdinals = Ints.toArray(ordinals); + } + this.basePort = calcBasePort(); + } + + public UnicastZen(int numOfNodes, int[] unicastHostOrdinals) { + this(numOfNodes, ImmutableSettings.EMPTY, unicastHostOrdinals); + } + + public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals) { + super(numOfNodes, extraSettings); + this.unicastHostOrdinals = unicastHostOrdinals; + this.basePort = calcBasePort(); + } + + private final static int calcBasePort() { + return 10000 + + 1000 * (ElasticsearchIntegrationTest.CHILD_VM_ID.hashCode() % 60) + // up to 60 jvms + 100 * portRangeCounter.incrementAndGet(); // up to 100 nodes + } + + + @Override + public Settings node(int nodeOrdinal) { + ImmutableSettings.Builder builder = ImmutableSettings.builder() + .put("discovery.zen.ping.multicast.enabled", false); + + String[] unicastHosts = new String[unicastHostOrdinals.length]; + if (InternalTestCluster.NODE_MODE.equals("local")) { + builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal); + for (int i = 0; i < unicastHosts.length; i++) { + unicastHosts[i] = "node_" + unicastHostOrdinals[i]; + } + } else { + // we need to pin the node port & host so we'd know where to point things + builder.put("transport.tcp.port", basePort + nodeOrdinal); + builder.put("transport.host", "localhost"); + for (int i = 0; i < unicastHosts.length; i++) { + unicastHosts[i] = "localhost:" + (basePort + unicastHostOrdinals[i]); + } + } + builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts); + return builder.put(super.node(nodeOrdinal)).build(); + } + } +} diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java index 4ec9672e4e5..1059b587aa5 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java @@ -37,19 +37,18 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.*; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; +import org.junit.Before; import org.junit.Test; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -68,15 +67,18 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT private static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places. - private static final Settings nodeSettings = ImmutableSettings.settingsBuilder() - .put("gateway.type", "local") - .put("discovery.type", "zen") // <-- To override the local setting if set externally - .put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly - .put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly - .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly - .put("discovery.zen.minimum_master_nodes", 2) - .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) - .build(); + private ClusterDiscoveryConfiguration discoveryConfig; + + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return discoveryConfig.node(nodeOrdinal); + } + + @Before + public void clearConfig() { + discoveryConfig = null; + } @Override protected int numberOfShards() { @@ -88,6 +90,31 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT return 1; } + private List startCluster(int numberOfNodes) throws ExecutionException, InterruptedException { + Settings settings = ImmutableSettings.builder() + // TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results + .put("discovery.zen.ping_timeout", "0.5s") + // end of temporary solution + .put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly + .put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly + .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly + .put("http.enabled", false) // just to make test quicker + .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) + .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, numberOfNodes / 2 + 1).build(); + + if (discoveryConfig == null) { + if (randomBoolean()) { + discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, numberOfNodes, settings); + } else { + discoveryConfig = new ClusterDiscoveryConfiguration(numberOfNodes, settings); + } + } + List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); + ensureStableCluster(numberOfNodes); + + return nodes; + } + /** * Test that no split brain occurs under partial network partition. See https://github.com/elasticsearch/elasticsearch/issues/2488 * @@ -96,10 +123,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT @Test public void failWithMinimumMasterNodesConfigured() throws Exception { - List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - - // Wait until 3 nodes are part of the cluster - ensureStableCluster(3); + List nodes = startCluster(3); // Figure out what is the elected master node final String masterNode = internalCluster().getMasterName(); @@ -154,9 +178,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT @Test @TestLogging(value = "cluster.service:TRACE,indices.recovery:TRACE") public void testVerifyApiBlocksDuringPartition() throws Exception { - internalCluster().startNodesAsync(3, nodeSettings).get(); - // Wait until a 3 nodes are part of the cluster - ensureStableCluster(3); + startCluster(3); // Makes sure that the get request can be executed on each node locally: assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() @@ -276,8 +298,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT @Test @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception { - final List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - ensureStableCluster(3); + final List nodes = startCluster(3); assertAcked(prepareCreate("test") .setSettings(ImmutableSettings.builder() @@ -340,8 +361,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT @LuceneTestCase.AwaitsFix(bugUrl = "needs some more work to stabilize") @TestLogging("action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") public void testAckedIndexing() throws Exception { - final List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - ensureStableCluster(3); + final List nodes = startCluster(3); assertAcked(prepareCreate("test") .setSettings(ImmutableSettings.builder() @@ -478,8 +498,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT @Test @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") public void testRejoinDocumentExistsInAllShardCopies() throws Exception { - List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - ensureStableCluster(3); + List nodes = startCluster(3); assertAcked(prepareCreate("test") .setSettings(ImmutableSettings.builder() diff --git a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java b/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java index 5f3f6cf978e..c36834d7cf9 100644 --- a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java +++ b/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; -import org.elasticsearch.transport.local.LocalTransport; import org.junit.Before; import org.junit.Test; @@ -38,46 +37,24 @@ import static org.hamcrest.Matchers.equalTo; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest { - private static int currentNumNodes = -1; - - static int currentBaseHttpPort = -1; - static int currentNumOfUnicastHosts = -1; - - @Before - public void setUP() throws Exception { - ElasticsearchIntegrationTest.beforeClass(); - currentNumNodes = randomIntBetween(3, 5); - currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes); - currentBaseHttpPort = 25000 + randomInt(100); - } + private ClusterDiscoveryConfiguration discoveryConfig; @Override protected Settings nodeSettings(int nodeOrdinal) { - ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder() - .put("discovery.type", "zen") - .put("discovery.zen.ping.multicast.enabled", false) - .put("http.enabled", false) // just to make test quicker - .put(super.nodeSettings(nodeOrdinal)); + return discoveryConfig.node(nodeOrdinal); + } - String[] unicastHosts = new String[currentNumOfUnicastHosts]; - if (internalCluster().getDefaultSettings().get("node.mode").equals("local")) { - builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "unicast_test_" + nodeOrdinal); - for (int i = 0; i < unicastHosts.length; i++) { - unicastHosts[i] = "unicast_test_" + i; - } - } else { - // we need to pin the node ports so we'd know where to point things - builder.put("transport.tcp.port", currentBaseHttpPort + nodeOrdinal); - for (int i = 0; i < unicastHosts.length; i++) { - unicastHosts[i] = "localhost:" + (currentBaseHttpPort + i); - } - } - builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts); - return builder.build(); + @Before + public void clearConfig() { + discoveryConfig = null; } @Test public void testNormalClusterForming() throws ExecutionException, InterruptedException { + int currentNumNodes = randomIntBetween(3, 5); + int currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes); + discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts); + internalCluster().startNodesAsync(currentNumNodes).get(); if (client().admin().cluster().prepareHealth().setWaitForNodes("" + currentNumNodes).get().isTimedOut()) { @@ -91,9 +68,12 @@ public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest { // test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=N // can't be satisfied. public void testMinimumMasterNodes() throws Exception { + int currentNumNodes = randomIntBetween(3, 5); + int currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes); final Settings settings = ImmutableSettings.settingsBuilder().put("discovery.zen.minimum_master_nodes", currentNumNodes / 2 + 1).build(); + discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts, settings); - List nodes = internalCluster().startNodesAsync(currentNumNodes, settings).get(); + List nodes = internalCluster().startNodesAsync(currentNumNodes).get(); ensureGreen(); diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 500df9541ef..113520a8327 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -107,8 +107,8 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder; import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; /** * InternalTestCluster manages a set of JVM private nodes and allows convenient access to them. @@ -155,7 +155,7 @@ public final class InternalTestCluster extends TestCluster { static final boolean DEFAULT_ENABLE_RANDOM_BENCH_NODES = true; - static final String NODE_MODE = nodeMode(); + public static final String NODE_MODE = nodeMode(); /* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */ private final NavigableMap nodes = new TreeMap<>(); diff --git a/src/test/java/org/elasticsearch/test/SettingsSource.java b/src/test/java/org/elasticsearch/test/SettingsSource.java index 8829885bf7b..6341d842d67 100644 --- a/src/test/java/org/elasticsearch/test/SettingsSource.java +++ b/src/test/java/org/elasticsearch/test/SettingsSource.java @@ -20,7 +20,7 @@ package org.elasticsearch.test; import org.elasticsearch.common.settings.Settings; -abstract class SettingsSource { +public abstract class SettingsSource { public static final SettingsSource EMPTY = new SettingsSource() { @Override @@ -35,7 +35,7 @@ abstract class SettingsSource { }; /** - * @return the settings for the node represented by the given ordinal, or {@code null} if there are no settings defined + * @return the settings for the node represented by the given ordinal, or {@code null} if there are no settings defined */ public abstract Settings node(int nodeOrdinal);