DISCOVERY: Cleanup AbstractDisruptionTestCase (#34808)

* DISCOVERY: Cleanup AbstractDisruptionTestCase

* Make the internal test cluster manage minimum master nodes where we used the default of (nodes / 2 + 1) before
* Remove use of the `NodeConfigurationSource` indirection
* Relates #33675
This commit is contained in:
Armin Braun 2018-10-31 07:52:37 +01:00 committed by GitHub
parent 455906d431
commit 3fa67c5d8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 113 additions and 119 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.discovery;
import java.nio.file.Path;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -27,15 +26,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
@ -56,7 +52,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
@ -64,19 +59,13 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
private NodeConfigurationSource discoveryConfig;
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(discoveryConfig.nodeSettings(nodeOrdinal))
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(DEFAULT_SETTINGS)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}
@Before
public void clearConfig() {
discoveryConfig = null;
}
@Override
protected int numberOfShards() {
return 3;
@ -119,11 +108,6 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
}
List<String> startCluster(int numberOfNodes) {
return startCluster(numberOfNodes, -1);
}
List<String> startCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(numberOfNodes, minimumMasterNode);
InternalTestCluster internalCluster = internalCluster();
List<String> nodes = internalCluster.startNodes(numberOfNodes);
ensureStableCluster(numberOfNodes);
@ -152,38 +136,6 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
return Arrays.asList(MockTransportService.TestPlugin.class);
}
void configureCluster(int numberOfNodes, int minimumMasterNode) {
configureCluster(DEFAULT_SETTINGS, numberOfNodes, minimumMasterNode);
}
void configureCluster(Settings settings, int numberOfNodes, int minimumMasterNode) {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
logger.info("---> configured unicast");
// TODO: Rarely use default settings form some of these
Settings nodeSettings = Settings.builder()
.put(settings)
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file")
.build();
if (discoveryConfig == null) {
discoveryConfig = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(final int nodeOrdinal) {
return nodeSettings;
}
@Override
public Path nodeConfigPath(final int nodeOrdinal) {
return null;
}
};
}
}
ClusterState getNodeClusterState(String node) {
return client(node).admin().cluster().prepareState().setLocal(true).get().getState();
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class ClusterDisruptionCleanSettingsIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}
/**
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
* node but already deleted on the source node. Search request should still work.
*/
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
// Don't use AbstractDisruptionTestCase.DEFAULT_SETTINGS as settings
// (which can cause node disconnects on a slow CI machine)
internalCluster().startMasterOnlyNode();
final String node_1 = internalCluster().startDataOnlyNode();
logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
Settings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
);
ensureGreen("test");
final String node_2 = internalCluster().startDataOnlyNode();
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("_doc")
.setSource("{\"int_field\":1}", XContentType.JSON));
}
indexRandom(true, indexRequestBuilderList);
IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
// now search for the documents and see if we get a reply
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L));
}
}

View File

@ -24,7 +24,6 @@ import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
@ -36,9 +35,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
@ -72,8 +69,8 @@ import static org.hamcrest.Matchers.not;
/**
* Tests various cluster operations (e.g., indexing) during disruptions.
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
/**
@ -289,7 +286,7 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
// simulate handling of sending shard failure during an isolation
public void testSendingShardFailure() throws Exception {
List<String> nodes = startCluster(3, 2);
List<String> nodes = startCluster(3);
String masterNode = internalCluster().getMasterName();
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
String nonMasterNode = randomFrom(nonMasterNodes);
@ -357,43 +354,10 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
}
}
/**
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
* node but already deleted on the source node. Search request should still work.
*/
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
// don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
configureCluster(Settings.EMPTY, 3, 1);
internalCluster().startMasterOnlyNode();
final String node_1 = internalCluster().startDataOnlyNode();
logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
Settings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
);
ensureGreen("test");
final String node_2 = internalCluster().startDataOnlyNode();
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("_doc")
.setSource("{\"int_field\":1}", XContentType.JSON));
}
indexRandom(true, indexRequestBuilderList);
IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
// now search for the documents and see if we get a reply
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L));
}
public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception {
// test for https://github.com/elastic/elasticsearch/issues/8823
configureCluster(2, 1);
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNode(Settings.EMPTY);
ensureStableCluster(2);
assertAcked(prepareCreate("index").setSettings(Settings.builder().put("index.number_of_replicas", 0)));
index("index", "_doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
@ -416,14 +380,12 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
*/
public void testIndicesDeleted() throws Exception {
final Settings settings = Settings.builder()
.put(DEFAULT_SETTINGS)
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
.build();
final String idxName = "test";
configureCluster(settings, 3, 2);
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2);
final String dataNode = internalCluster().startDataOnlyNode();
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2, settings);
final String dataNode = internalCluster().startDataOnlyNode(settings);
ensureStableCluster(3);
assertAcked(prepareCreate("test"));

View File

@ -54,13 +54,13 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/**
* Tests for discovery during disruptions.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
public void testIsolatedUnicastNodes() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4, -1);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String unicastTarget = nodes.get(0);
@ -100,7 +100,7 @@ public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
*/
public void testUnicastSinglePingResponseContainsMaster() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4, -1);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node={}", masterNode);
@ -138,15 +138,8 @@ public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
* Test cluster join with issues in cluster state publishing *
*/
public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
List<String> nodes = startCluster(2, 1);
String masterNode = internalCluster().getMasterName();
String nonMasterNode;
if (masterNode.equals(nodes.get(0))) {
nonMasterNode = nodes.get(1);
} else {
nonMasterNode = nodes.get(0);
}
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
String nonMasterNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
@ -196,7 +189,6 @@ public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
}
public void testClusterFormingWithASlowNode() throws Exception {
configureCluster(3, 2);
SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);
@ -212,7 +204,6 @@ public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
}
public void testElectMasterWithLatestVersion() throws Exception {
configureCluster(3, 2);
final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
ensureStableCluster(3);
ServiceDisruptionScheme isolateAllNodes =

View File

@ -67,8 +67,8 @@ import static org.hamcrest.Matchers.nullValue;
/**
* Tests relating to the loss of the master.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class MasterDisruptionIT extends AbstractDisruptionTestCase {
/**
@ -153,8 +153,8 @@ public class MasterDisruptionIT extends AbstractDisruptionTestCase {
*/
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.test.disruption:TRACE")
public void testStaleMasterNotHijackingMajority() throws Exception {
// 3 node cluster with unicast discovery and minimum_master_nodes set to 2:
final List<String> nodes = startCluster(3, 2);
// 3 node cluster with unicast discovery and minimum_master_nodes set to the default of 2:
final List<String> nodes = startCluster(3);
// Save the current master node as old master node, because that node will get frozen
final String oldMasterNode = internalCluster().getMasterName();
@ -267,7 +267,7 @@ public class MasterDisruptionIT extends AbstractDisruptionTestCase {
* Test that cluster recovers from a long GC on master that causes other nodes to elect a new one
*/
public void testMasterNodeGCs() throws Exception {
List<String> nodes = startCluster(3, -1);
List<String> nodes = startCluster(3);
String oldMasterNode = internalCluster().getMasterName();
// a very long GC, but it's OK as we remove the disruption when it has had an effect

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.discovery;
import java.util.Arrays;
import java.util.Collection;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
@ -28,10 +30,12 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -40,8 +44,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.test.transport.MockTransportService;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.instanceOf;
@ -49,17 +53,26 @@ import static org.hamcrest.Matchers.instanceOf;
/**
* Tests snapshot operations during disruptions.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
@TestLogging("org.elasticsearch.snapshot:TRACE")
public class SnapshotDisruptionIT extends AbstractDisruptionTestCase {
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class SnapshotDisruptionIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(AbstractDisruptionTestCase.DEFAULT_SETTINGS)
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s")
.build();
}
public void testDisruptionOnSnapshotInitialization() throws Exception {
final Settings settings = Settings.builder()
.put(DEFAULT_SETTINGS)
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
.build();
final String idxName = "test";
configureCluster(settings, 4, 2);
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);
@ -159,7 +172,7 @@ public class SnapshotDisruptionIT extends AbstractDisruptionTestCase {
}
}
private void createRandomIndex(String idxName) throws ExecutionException, InterruptedException {
private void createRandomIndex(String idxName) throws InterruptedException {
assertAcked(prepareCreate(idxName, 0, Settings.builder().put("number_of_shards", between(1, 20))
.put("number_of_replicas", 0)));
logger.info("--> indexing some data");