From 522b137097f86b291287af5ee207c55895cd5232 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 11 Aug 2016 14:55:06 +0200 Subject: [PATCH] Make NetworkPartition disruption scheme configurable (#19534) This commit separates the description of the links in the network that are to be disrupted from the failure that is to be applied to the links (disconnect/unresponsive/delay). Previously we had subclasses for the various kind of network disruption schemes combining on one hand failure mode (disconnect/unresponsive/delay) as well as the network links to cut (two partitions / bridge partitioning) into a single class. --- .../master/IndexingMasterFailoverIT.java | 9 +- .../cluster/MinimumMasterNodesIT.java | 11 +- .../cluster/routing/PrimaryAllocationIT.java | 12 +- .../DiscoveryWithServiceDisruptionsIT.java | 128 ++--- .../test/disruption/BridgePartition.java | 74 --- .../disruption/NetworkDelaysPartition.java | 94 ---- .../NetworkDisconnectPartition.java | 57 --- .../test/disruption/NetworkDisruption.java | 453 ++++++++++++++++++ ...titionIT.java => NetworkDisruptionIT.java} | 11 +- .../disruption/NetworkDisruptionTests.java | 102 ++++ .../test/disruption/NetworkPartition.java | 204 -------- .../NetworkUnresponsivePartition.java | 56 --- 12 files changed, 655 insertions(+), 556 deletions(-) delete mode 100644 test/framework/src/main/java/org/elasticsearch/test/disruption/BridgePartition.java delete mode 100644 test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java delete mode 100644 test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java create mode 100644 test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java rename test/framework/src/main/java/org/elasticsearch/test/disruption/{NetworkPartitionIT.java => NetworkDisruptionIT.java} (76%) create mode 100644 test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java delete mode 100644 test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartition.java delete mode 100644 test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index 81b5290f63b..b30a3435479 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -27,8 +27,9 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.disruption.NetworkDisconnectPartition; -import org.elasticsearch.test.disruption.NetworkPartition; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect; +import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.transport.MockTransportService; import java.util.Arrays; @@ -112,7 +113,9 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase { Set otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames())); otherNodes.remove(master); - NetworkPartition partition = new NetworkDisconnectPartition(Collections.singleton(master), otherNodes, random()); + NetworkDisruption partition = new NetworkDisruption( + new TwoPartitions(Collections.singleton(master), otherNodes), + new NetworkDisconnect()); internalCluster().setDisruptionScheme(partition); logger.info("--> disrupting network"); diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index aad2aa212a1..33141107b2e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -36,7 +37,9 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.test.disruption.NetworkDelaysPartition; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDelay; +import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; @@ -372,12 +375,14 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { final String master = internalCluster().getMasterName(); Set otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames())); otherNodes.remove(master); - NetworkDelaysPartition partition = new NetworkDelaysPartition(Collections.singleton(master), otherNodes, 60000, random()); + NetworkDisruption partition = new NetworkDisruption( + new TwoPartitions(Collections.singleton(master), otherNodes), + new NetworkDelay(TimeValue.timeValueMinutes(1))); internalCluster().setDisruptionScheme(partition); partition.startDisrupting(); final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference failure = new AtomicReference<>(); + final AtomicReference failure = new AtomicReference<>(); logger.debug("--> submitting for cluster state to be rejected"); final ClusterService masterClusterService = internalCluster().clusterService(master); masterClusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index f267af66dc6..8053b67399a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -28,17 +28,18 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimary import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.disruption.NetworkDisconnectPartition; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect; +import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.transport.MockTransportService; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.concurrent.ExecutionException; @@ -85,8 +86,9 @@ public class PrimaryAllocationIT extends ESIntegTestCase { replicaNode = state.getRoutingNodes().node(shards.get(0).currentNodeId()).node().getName(); } - NetworkDisconnectPartition partition = new NetworkDisconnectPartition( - new HashSet<>(Arrays.asList(master, replicaNode)), Collections.singleton(primaryNode), random()); + NetworkDisruption partition = new NetworkDisruption( + new TwoPartitions(Sets.newHashSet(master, replicaNode), Collections.singleton(primaryNode)), + new NetworkDisconnect()); internalCluster().setDisruptionScheme(partition); logger.info("--> partitioning node with primary shard from rest of cluster"); partition.startDisrupting(); diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index e7a8487bcc0..d73db6945dc 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -63,13 +63,16 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration; import org.elasticsearch.test.disruption.BlockClusterStateProcessing; -import org.elasticsearch.test.disruption.BridgePartition; import org.elasticsearch.test.disruption.IntermittentLongGCDisruption; import org.elasticsearch.test.disruption.LongGCDisruption; -import org.elasticsearch.test.disruption.NetworkDelaysPartition; -import org.elasticsearch.test.disruption.NetworkDisconnectPartition; -import org.elasticsearch.test.disruption.NetworkPartition; -import org.elasticsearch.test.disruption.NetworkUnresponsivePartition; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.disruption.NetworkDisruption.Bridge; +import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDelay; +import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect; +import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType; +import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive; +import org.elasticsearch.test.disruption.NetworkDisruption.DisruptedLinks; +import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.disruption.SingleNodeDisruption; import org.elasticsearch.test.disruption.SlowClusterStateProcessing; @@ -234,7 +237,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // Simulate a network issue between the unlucky node and elected master node in both directions. - NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(masterNode, unluckyNode, random()); + NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(masterNode, unluckyNode), + new NetworkDisconnect()); setDisruptionScheme(networkDisconnect); networkDisconnect.startDisrupting(); @@ -282,7 +286,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } logger.info("--> isolating [{}]", nonMaster); - addRandomIsolation(nonMaster).startDisrupting(); + TwoPartitions partitions = isolateNode(nonMaster); + NetworkDisruption networkDisruption = addRandomDisruptionType(partitions); + networkDisruption.startDisrupting(); logger.info("--> waiting for master to remove it"); ensureStableCluster(2, master); @@ -305,15 +311,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // (waiting for green here, because indexing / search in a yellow index is fine as long as no other nodes go down) ensureGreen("test"); - NetworkPartition networkPartition = addRandomPartition(); + TwoPartitions partitions = TwoPartitions.random(random(), internalCluster().getNodeNames()); + NetworkDisruption networkDisruption = addRandomDisruptionType(partitions); - assertEquals(1, networkPartition.getMinoritySide().size()); - final String isolatedNode = networkPartition.getMinoritySide().iterator().next(); - assertEquals(2, networkPartition.getMajoritySide().size()); - final String nonIsolatedNode = networkPartition.getMajoritySide().iterator().next(); + assertEquals(1, partitions.getMinoritySide().size()); + final String isolatedNode = partitions.getMinoritySide().iterator().next(); + assertEquals(2, partitions.getMajoritySide().size()); + final String nonIsolatedNode = partitions.getMajoritySide().iterator().next(); // Simulate a network issue between the unlucky node and the rest of the cluster. - networkPartition.startDisrupting(); + networkDisruption.startDisrupting(); // The unlucky node must report *no* master node, since it can't connect to master and in fact it should @@ -326,7 +333,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.info("wait until elected master has been removed and a new 2 node cluster was from (via [{}])", isolatedNode); ensureStableCluster(2, nonIsolatedNode); - for (String node : networkPartition.getMajoritySide()) { + for (String node : partitions.getMajoritySide()) { ClusterState nodeState = getNodeClusterState(node); boolean success = true; if (nodeState.nodes().getMasterNode() == null) { @@ -342,17 +349,17 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } - networkPartition.stopDisrupting(); + networkDisruption.stopDisrupting(); // Wait until the master node sees al 3 nodes again. - ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis())); + ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis())); logger.info("Verify no master block with {} set to {}", DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all"); client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all")) .get(); - networkPartition.startDisrupting(); + networkDisruption.startDisrupting(); // The unlucky node must report *no* master node, since it can't connect to master and in fact it should @@ -384,10 +391,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { ensureGreen(); String isolatedNode = internalCluster().getMasterName(); - NetworkPartition networkPartition = addRandomIsolation(isolatedNode); - networkPartition.startDisrupting(); + TwoPartitions partitions = isolateNode(isolatedNode); + NetworkDisruption networkDisruption = addRandomDisruptionType(partitions); + networkDisruption.startDisrupting(); - String nonIsolatedNode = networkPartition.getMajoritySide().iterator().next(); + String nonIsolatedNode = partitions.getMajoritySide().iterator().next(); // make sure cluster reforms ensureStableCluster(2, nonIsolatedNode); @@ -396,10 +404,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { assertNoMaster(isolatedNode, TimeValue.timeValueSeconds(40)); // restore isolation - networkPartition.stopDisrupting(); + networkDisruption.stopDisrupting(); for (String node : nodes) { - ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis()), + ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()), true, node); } @@ -753,7 +761,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { String isolatedNode = nodes.get(0); String notIsolatedNode = nodes.get(1); - ServiceDisruptionScheme scheme = addRandomIsolation(isolatedNode); + TwoPartitions partitions = isolateNode(isolatedNode); + NetworkDisruption scheme = addRandomDisruptionType(partitions); scheme.startDisrupting(); ensureStableCluster(2, notIsolatedNode); assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut()); @@ -811,7 +820,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } // Simulate a network issue between the unlucky node and elected master node in both directions. - NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(masterNode, isolatedNode, random()); + NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(masterNode, isolatedNode), + new NetworkDisconnect()); setDisruptionScheme(networkDisconnect); networkDisconnect.startDisrupting(); // Wait until elected master has removed that the unlucky node... @@ -848,7 +858,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } // Simulate a network issue between the unicast target node and the rest of the cluster - NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(unicastTargetSide, restOfClusterSide, random()); + NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(unicastTargetSide, restOfClusterSide), + new NetworkDisconnect()); setDisruptionScheme(networkDisconnect); networkDisconnect.startDisrupting(); // Wait until elected master has removed that the unlucky node... @@ -948,8 +959,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { AtomicBoolean success = new AtomicBoolean(); String isolatedNode = randomBoolean() ? masterNode : nonMasterNode; - NetworkPartition networkPartition = addRandomIsolation(isolatedNode); - networkPartition.startDisrupting(); + TwoPartitions partitions = isolateNode(isolatedNode); + NetworkDisruption networkDisruption = addRandomDisruptionType(partitions); + networkDisruption.startDisrupting(); service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() { @@ -974,7 +986,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } // heal the partition - networkPartition.removeAndEnsureHealthy(internalCluster()); + networkDisruption.removeAndEnsureHealthy(internalCluster()); // the cluster should stabilize ensureStableCluster(3); @@ -1136,9 +1148,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { assertAcked(prepareCreate("test")); final String masterNode1 = internalCluster().getMasterName(); - NetworkPartition networkPartition = new NetworkUnresponsivePartition(masterNode1, dataNode.get(), random()); - internalCluster().setDisruptionScheme(networkPartition); - networkPartition.startDisrupting(); + NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(masterNode1, dataNode.get()), + new NetworkUnresponsive()); + internalCluster().setDisruptionScheme(networkDisruption); + networkDisruption.startDisrupting(); // We know this will time out due to the partition, we check manually below to not proceed until // the delete has been applied to the master node and the master eligible node. internalCluster().client(masterNode1).admin().indices().prepareDelete(idxName).setTimeout("0s").get(); @@ -1155,49 +1168,52 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { assertFalse(client().admin().indices().prepareExists(idxName).get().isExists()); } - protected NetworkPartition addRandomPartition() { - NetworkPartition partition; + protected NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) { + final NetworkLinkDisruptionType disruptionType; if (randomBoolean()) { - partition = new NetworkUnresponsivePartition(random()); + disruptionType = new NetworkUnresponsive(); } else { - partition = new NetworkDisconnectPartition(random()); + disruptionType = new NetworkDisconnect(); } + NetworkDisruption partition = new NetworkDisruption(partitions, disruptionType); setDisruptionScheme(partition); return partition; } - protected NetworkPartition addRandomIsolation(String isolatedNode) { + protected TwoPartitions isolateNode(String isolatedNode) { Set side1 = new HashSet<>(); Set side2 = new HashSet<>(Arrays.asList(internalCluster().getNodeNames())); side1.add(isolatedNode); side2.remove(isolatedNode); - NetworkPartition partition; - if (randomBoolean()) { - partition = new NetworkUnresponsivePartition(side1, side2, random()); - } else { - partition = new NetworkDisconnectPartition(side1, side2, random()); - } - - internalCluster().setDisruptionScheme(partition); - - return partition; + return new TwoPartitions(side1, side2); } private ServiceDisruptionScheme addRandomDisruptionScheme() { // TODO: add partial partitions - List list = Arrays.asList( - new NetworkUnresponsivePartition(random()), - new NetworkDelaysPartition(random()), - new NetworkDisconnectPartition(random()), - new SlowClusterStateProcessing(random()), - new BridgePartition(random(), randomBoolean()) - ); - Collections.shuffle(list, random()); - setDisruptionScheme(list.get(0)); - return list.get(0); + final DisruptedLinks disruptedLinks; + if (randomBoolean()) { + disruptedLinks = TwoPartitions.random(random(), internalCluster().getNodeNames()); + } else { + disruptedLinks = Bridge.random(random(), internalCluster().getNodeNames()); + } + final NetworkLinkDisruptionType disruptionType; + switch (randomInt(2)) { + case 0: disruptionType = new NetworkUnresponsive(); break; + case 1: disruptionType = new NetworkDisconnect(); break; + case 2: disruptionType = NetworkDelay.random(random()); break; + default: throw new IllegalArgumentException(); + } + final ServiceDisruptionScheme scheme; + if (rarely()) { + scheme = new SlowClusterStateProcessing(random()); + } else { + scheme = new NetworkDisruption(disruptedLinks, disruptionType); + } + setDisruptionScheme(scheme); + return scheme; } private ClusterState getNodeClusterState(String node) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/BridgePartition.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/BridgePartition.java deleted file mode 100644 index 1a9c2b686c3..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/BridgePartition.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test.disruption; - -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.transport.MockTransportService; - -import java.util.Random; - -import static org.elasticsearch.test.ESTestCase.randomFrom; - -/** - * A partition that breaks the cluster into two groups of nodes. The two groups are fully isolated - * with the exception of a single node that can see and be seen by all nodes in both groups. - */ -public class BridgePartition extends NetworkPartition { - - String bridgeNode; - final boolean unresponsive; - - public BridgePartition(Random random, boolean unresponsive) { - super(random); - this.unresponsive = unresponsive; - } - - @Override - public void applyToCluster(InternalTestCluster cluster) { - bridgeNode = randomFrom(random, cluster.getNodeNames()); - this.cluster = cluster; - for (String node: cluster.getNodeNames()) { - if (node.equals(bridgeNode) == false) { - super.applyToNode(node, cluster); - } - } - } - - @Override - public TimeValue expectedTimeToHeal() { - return TimeValue.timeValueSeconds(0); - } - - @Override - void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) { - if (unresponsive) { - transportService1.addUnresponsiveRule(transportService2); - transportService2.addUnresponsiveRule(transportService1); - } else { - transportService1.addFailToSendNoConnectRule(transportService2); - transportService2.addFailToSendNoConnectRule(transportService1); - } - } - - @Override - protected String getPartitionDescription() { - return "bridge (super connected node: [" + bridgeNode + "], unresponsive [" + unresponsive + "])"; - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java deleted file mode 100644 index c422b042721..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test.disruption; - -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.transport.MockTransportService; - -import java.util.Random; -import java.util.Set; - -public class NetworkDelaysPartition extends NetworkPartition { - - static long DEFAULT_DELAY_MIN = 10000; - static long DEFAULT_DELAY_MAX = 90000; - - - final long delayMin; - final long delayMax; - - TimeValue duration; - - public NetworkDelaysPartition(Random random) { - this(random, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX); - } - - public NetworkDelaysPartition(Random random, long delayMin, long delayMax) { - super(random); - this.delayMin = delayMin; - this.delayMax = delayMax; - } - - public NetworkDelaysPartition(String node1, String node2, Random random) { - this(node1, node2, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random); - } - - public NetworkDelaysPartition(String node1, String node2, long delayMin, long delayMax, Random random) { - super(node1, node2, random); - this.delayMin = delayMin; - this.delayMax = delayMax; - } - - public NetworkDelaysPartition(Set nodesSideOne, Set nodesSideTwo, Random random) { - this(nodesSideOne, nodesSideTwo, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random); - } - - public NetworkDelaysPartition(Set nodesSideOne, Set nodesSideTwo, long delay, Random random) { - this(nodesSideOne, nodesSideTwo, delay, delay, random); - } - - public NetworkDelaysPartition(Set nodesSideOne, Set nodesSideTwo, long delayMin, long delayMax, Random random) { - super(nodesSideOne, nodesSideTwo, random); - this.delayMin = delayMin; - this.delayMax = delayMax; - - } - - @Override - public synchronized void startDisrupting() { - duration = new TimeValue(delayMin == delayMax ? delayMin : delayMin + random.nextInt((int) (delayMax - delayMin))); - super.startDisrupting(); - } - - @Override - void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) { - transportService1.addUnresponsiveRule(transportService1, duration); - transportService1.addUnresponsiveRule(transportService2, duration); - } - - @Override - protected String getPartitionDescription() { - return "network delays for [" + duration + "]"; - } - - @Override - public TimeValue expectedTimeToHeal() { - return TimeValue.timeValueMillis(delayMax); - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java deleted file mode 100644 index ed0aa17cfcf..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test.disruption; - -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.transport.MockTransportService; - -import java.util.Random; -import java.util.Set; - -public class NetworkDisconnectPartition extends NetworkPartition { - - - public NetworkDisconnectPartition(Random random) { - super(random); - } - - public NetworkDisconnectPartition(String node1, String node2, Random random) { - super(node1, node2, random); - } - - public NetworkDisconnectPartition(Set nodesSideOne, Set nodesSideTwo, Random random) { - super(nodesSideOne, nodesSideTwo, random); - } - - @Override - protected String getPartitionDescription() { - return "disconnected"; - } - - @Override - void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) { - transportService1.addFailToSendNoConnectRule(transportService2); - transportService2.addFailToSendNoConnectRule(transportService1); - } - - @Override - public TimeValue expectedTimeToHeal() { - return TimeValue.timeValueSeconds(0); - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java new file mode 100644 index 00000000000..40839f428e2 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java @@ -0,0 +1,453 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.disruption; + +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.TransportService; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.function.BiConsumer; + +import static org.junit.Assert.assertFalse; + +/** + * Network disruptions are modeled using two components: + * 1) the {@link DisruptedLinks} represents the links in the network that are to be disrupted + * 2) the {@link NetworkLinkDisruptionType} represents the failure mode that is to be applied to the links + */ +public class NetworkDisruption implements ServiceDisruptionScheme { + + private final ESLogger logger = Loggers.getLogger(NetworkDisruption.class); + + private final DisruptedLinks disruptedLinks; + private final NetworkLinkDisruptionType networkLinkDisruptionType; + + protected volatile InternalTestCluster cluster; + protected volatile boolean activeDisruption = false; + + public NetworkDisruption(DisruptedLinks disruptedLinks, NetworkLinkDisruptionType networkLinkDisruptionType) { + this.disruptedLinks = disruptedLinks; + this.networkLinkDisruptionType = networkLinkDisruptionType; + } + + @Override + public void applyToCluster(InternalTestCluster cluster) { + this.cluster = cluster; + } + + @Override + public void removeFromCluster(InternalTestCluster cluster) { + stopDisrupting(); + } + + @Override + public void removeAndEnsureHealthy(InternalTestCluster cluster) { + removeFromCluster(cluster); + ensureNodeCount(cluster); + } + + protected void ensureNodeCount(InternalTestCluster cluster) { + assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth() + .setWaitForNodes("" + cluster.size()) + .setWaitForRelocatingShards(0) + .get().isTimedOut()); + } + + @Override + public synchronized void applyToNode(String node, InternalTestCluster cluster) { + + } + + @Override + public synchronized void removeFromNode(String node1, InternalTestCluster cluster) { + logger.info("stop disrupting node (disruption type: {}, disrupted links: {})", networkLinkDisruptionType, disruptedLinks); + applyToNodes(new String[]{ node1 }, cluster.getNodeNames(), networkLinkDisruptionType::removeDisruption); + applyToNodes(cluster.getNodeNames(), new String[]{ node1 }, networkLinkDisruptionType::removeDisruption); + } + + @Override + public synchronized void testClusterClosed() { + + } + + @Override + public synchronized void startDisrupting() { + logger.info("start disrupting (disruption type: {}, disrupted links: {})", networkLinkDisruptionType, disruptedLinks); + applyToNodes(cluster.getNodeNames(), cluster.getNodeNames(), networkLinkDisruptionType::applyDisruption); + activeDisruption = true; + } + + @Override + public synchronized void stopDisrupting() { + if (!activeDisruption) { + return; + } + logger.info("stop disrupting (disruption scheme: {}, disrupted links: {})", networkLinkDisruptionType, disruptedLinks); + applyToNodes(cluster.getNodeNames(), cluster.getNodeNames(), networkLinkDisruptionType::removeDisruption); + activeDisruption = false; + } + + /** + * Applies action to all disrupted links between two sets of nodes. + */ + private void applyToNodes(String[] nodes1, String[] nodes2, BiConsumer consumer) { + for (String node1 : nodes1) { + if (disruptedLinks.nodes().contains(node1)) { + for (String node2 : nodes2) { + if (disruptedLinks.nodes().contains(node2)) { + if (node1.equals(node2) == false) { + if (disruptedLinks.disrupt(node1, node2)) { + consumer.accept(transport(node1), transport(node2)); + } + } + } + } + } + } + } + + @Override + public TimeValue expectedTimeToHeal() { + return networkLinkDisruptionType.expectedTimeToHeal(); + } + + private MockTransportService transport(String node) { + return (MockTransportService) cluster.getInstance(TransportService.class, node); + } + + /** + * Represents a set of nodes with connections between nodes that are to be disrupted + */ + public abstract static class DisruptedLinks { + private final Set nodes; + + protected DisruptedLinks(Set... nodeSets) { + Set allNodes = new HashSet<>(); + for (Set nodeSet : nodeSets) { + allNodes.addAll(nodeSet); + } + this.nodes = allNodes; + } + + /** + * Set of all nodes that can participate in disruptions + */ + public Set nodes() { + return nodes; + } + + /** + * Returns true iff network should be disrupted between the two nodes + */ + public abstract boolean disrupt(String node1, String node2); + } + + /** + * Creates two partitions with symmetric failures + */ + public static class TwoPartitions extends DisruptedLinks { + + protected final Set nodesSideOne; + protected final Set nodesSideTwo; + + public TwoPartitions(String node1, String node2) { + this(Collections.singleton(node1), Collections.singleton(node2)); + } + + public TwoPartitions(Set nodesSideOne, Set nodesSideTwo) { + super(nodesSideOne, nodesSideTwo); + this.nodesSideOne = nodesSideOne; + this.nodesSideTwo = nodesSideTwo; + assert nodesSideOne.isEmpty() == false; + assert nodesSideTwo.isEmpty() == false; + assert Sets.haveEmptyIntersection(nodesSideOne, nodesSideTwo); + } + + public static TwoPartitions random(Random random, String... nodes) { + return random(random, Sets.newHashSet(nodes)); + } + + public static TwoPartitions random(Random random, Set nodes) { + assert nodes.size() >= 2 : "two partitions topology requires at least 2 nodes"; + Set nodesSideOne = new HashSet<>(); + Set nodesSideTwo = new HashSet<>(); + for (String node : nodes) { + if (nodesSideOne.isEmpty()) { + nodesSideOne.add(node); + } else if (nodesSideTwo.isEmpty()) { + nodesSideTwo.add(node); + } else if (random.nextBoolean()) { + nodesSideOne.add(node); + } else { + nodesSideTwo.add(node); + } + } + return new TwoPartitions(nodesSideOne, nodesSideTwo); + } + + @Override + public boolean disrupt(String node1, String node2) { + if (nodesSideOne.contains(node1) && nodesSideTwo.contains(node2)) { + return true; + } + if (nodesSideOne.contains(node2) && nodesSideTwo.contains(node1)) { + return true; + } + return false; + } + + public Set getNodesSideOne() { + return Collections.unmodifiableSet(nodesSideOne); + } + + public Set getNodesSideTwo() { + return Collections.unmodifiableSet(nodesSideTwo); + } + + public Collection getMajoritySide() { + if (nodesSideOne.size() >= nodesSideTwo.size()) { + return getNodesSideOne(); + } else { + return getNodesSideTwo(); + } + } + + public Collection getMinoritySide() { + if (nodesSideOne.size() >= nodesSideTwo.size()) { + return getNodesSideTwo(); + } else { + return getNodesSideOne(); + } + } + + @Override + public String toString() { + return "two partitions (partition 1: " + nodesSideOne + " and partition 2: " + nodesSideTwo + ")"; + } + } + + /** + * Creates two partitions with symmetric failures and a bridge node that can connect to both of the partitions + */ + public static class Bridge extends DisruptedLinks { + + private final String bridgeNode; + private final Set nodesSideOne; + private final Set nodesSideTwo; + + public Bridge(String bridgeNode, Set nodesSideOne, Set nodesSideTwo) { + super(Collections.singleton(bridgeNode), nodesSideOne, nodesSideTwo); + this.bridgeNode = bridgeNode; + this.nodesSideOne = nodesSideOne; + this.nodesSideTwo = nodesSideTwo; + assert nodesSideOne.isEmpty() == false; + assert nodesSideTwo.isEmpty() == false; + assert Sets.haveEmptyIntersection(nodesSideOne, nodesSideTwo); + assert nodesSideOne.contains(bridgeNode) == false && nodesSideTwo.contains(bridgeNode) == false; + } + + public static Bridge random(Random random, String... nodes) { + return random(random, Sets.newHashSet(nodes)); + } + + public static Bridge random(Random random, Set nodes) { + assert nodes.size() >= 3 : "bridge topology requires at least 3 nodes"; + String bridgeNode = RandomPicks.randomFrom(random, nodes); + Set nodesSideOne = new HashSet<>(); + Set nodesSideTwo = new HashSet<>(); + for (String node : nodes) { + if (node.equals(bridgeNode) == false) { + if (nodesSideOne.isEmpty()) { + nodesSideOne.add(node); + } else if (nodesSideTwo.isEmpty()) { + nodesSideTwo.add(node); + } else if (random.nextBoolean()) { + nodesSideOne.add(node); + } else { + nodesSideTwo.add(node); + } + } + } + return new Bridge(bridgeNode, nodesSideOne, nodesSideTwo); + } + + @Override + public boolean disrupt(String node1, String node2) { + if (nodesSideOne.contains(node1) && nodesSideTwo.contains(node2)) { + return true; + } + if (nodesSideOne.contains(node2) && nodesSideTwo.contains(node1)) { + return true; + } + return false; + } + + public String getBridgeNode() { + return bridgeNode; + } + + public Set getNodesSideOne() { + return nodesSideOne; + } + + public Set getNodesSideTwo() { + return nodesSideTwo; + } + + public String toString() { + return "bridge partition (super connected node: [" + bridgeNode + "], partition 1: " + nodesSideOne + + " and partition 2: " + nodesSideTwo + ")"; + } + } + + /** + * Abstract class representing various types of network disruptions. Instances of this class override the {@link #applyDisruption} + * method to apply their specific disruption type to requests that are send from a source to a target node. + */ + public abstract static class NetworkLinkDisruptionType { + + /** + * Applies network disruption for requests send from the node represented by the source transport service to the node represented + * by the target transport service. + * + * @param sourceTransportService source transport service from which requests are sent + * @param targetTransportService target transport service to which requests are sent + */ + public abstract void applyDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService); + + /** + * Removes network disruption that was added by {@link #applyDisruption}. + * + * @param sourceTransportService source transport service from which requests are sent + * @param targetTransportService target transport service to which requests are sent + */ + public void removeDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) { + sourceTransportService.clearRule(targetTransportService); + } + + /** + * Returns expected time to heal after disruption has been removed. Defaults to instant healing. + */ + public TimeValue expectedTimeToHeal() { + return TimeValue.timeValueMillis(0); + } + } + + /** + * Simulates a network disconnect. Sending a request from source to target node throws a {@link ConnectTransportException}. + */ + public static class NetworkDisconnect extends NetworkLinkDisruptionType { + + @Override + public void applyDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) { + sourceTransportService.addFailToSendNoConnectRule(targetTransportService); + } + + @Override + public String toString() { + return "network disconnects"; + } + } + + /** + * Simulates an unresponsive target node by dropping requests sent from source to target node. + */ + public static class NetworkUnresponsive extends NetworkLinkDisruptionType { + + @Override + public void applyDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) { + sourceTransportService.addUnresponsiveRule(targetTransportService); + } + + @Override + public String toString() { + return "network unresponsive"; + } + } + + /** + * Simulates slow or congested network. Delivery of requests that are sent from source to target node are delayed by a configurable + * time amount. + */ + public static class NetworkDelay extends NetworkLinkDisruptionType { + + public static TimeValue DEFAULT_DELAY_MIN = TimeValue.timeValueSeconds(10); + public static TimeValue DEFAULT_DELAY_MAX = TimeValue.timeValueSeconds(90); + + private final TimeValue delay; + + /** + * Delays requests by a fixed time value. + * + * @param delay time to delay requests + */ + public NetworkDelay(TimeValue delay) { + this.delay = delay; + } + + /** + * Delays requests by a random but fixed time value between {@link #DEFAULT_DELAY_MIN} and {@link #DEFAULT_DELAY_MAX}. + * + * @param random instance to use for randomization of delay + */ + public static NetworkDelay random(Random random) { + return random(random, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX); + } + + /** + * Delays requests by a random but fixed time value between delayMin and delayMax. + * + * @param random instance to use for randomization of delay + * @param delayMin minimum delay + * @param delayMax maximum delay + */ + public static NetworkDelay random(Random random, TimeValue delayMin, TimeValue delayMax) { + return new NetworkDelay(TimeValue.timeValueMillis(delayMin.millis() == delayMax.millis() ? + delayMin.millis() : + delayMin.millis() + random.nextInt((int) (delayMax.millis() - delayMin.millis())))); + } + + @Override + public void applyDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) { + sourceTransportService.addUnresponsiveRule(targetTransportService, delay); + } + + @Override + public TimeValue expectedTimeToHeal() { + return delay; + } + + @Override + public String toString() { + return "network delays for [" + delay + "]"; + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartitionIT.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionIT.java similarity index 76% rename from test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartitionIT.java rename to test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionIT.java index af16c2a888c..6f9149cb659 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartitionIT.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionIT.java @@ -22,12 +22,14 @@ package org.elasticsearch.test.disruption; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive; +import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.transport.MockTransportService; import java.io.IOException; import java.util.Collection; -public class NetworkPartitionIT extends ESIntegTestCase { +public class NetworkDisruptionIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { return pluginList(MockTransportService.TestPlugin.class); @@ -36,9 +38,10 @@ public class NetworkPartitionIT extends ESIntegTestCase { public void testNetworkPartitionWithNodeShutdown() throws IOException { internalCluster().ensureAtLeastNumDataNodes(2); String[] nodeNames = internalCluster().getNodeNames(); - NetworkPartition networkPartition = new NetworkUnresponsivePartition(nodeNames[0], nodeNames[1], random()); - internalCluster().setDisruptionScheme(networkPartition); - networkPartition.startDisrupting(); + NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(nodeNames[0], nodeNames[1]), + new NetworkUnresponsive()); + internalCluster().setDisruptionScheme(networkDisruption); + networkDisruption.startDisrupting(); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames[0])); internalCluster().clearDisruptionScheme(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java new file mode 100644 index 00000000000..4d0f1123a1b --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.disruption; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.disruption.NetworkDisruption.Bridge; +import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; + +import java.util.HashSet; +import java.util.Set; + +public class NetworkDisruptionTests extends ESTestCase { + + public void testTwoPartitions() { + Set partition1 = generateRandomStringSet(1, 10); + Set partition2 = generateRandomStringSet(1, 10); + TwoPartitions topology = new TwoPartitions(partition1, partition2); + checkTwoPartitions(topology, partition1, partition2); + } + + public void testRandomTwoPartitions() { + TwoPartitions topology = TwoPartitions.random(random(), generateRandomStringSet(2, 20)); + Set partition1 = topology.getNodesSideOne(); + Set partition2 = topology.getNodesSideTwo(); + checkTwoPartitions(topology, partition1, partition2); + } + + private void checkTwoPartitions(TwoPartitions topology, Set partition1, Set partition2) { + for (int i = 0; i < 10; i++) { + assertTrue(topology.disrupt(randomFrom(partition1), randomFrom(partition2))); + assertTrue(topology.disrupt(randomFrom(partition2), randomFrom(partition1))); + assertFalse(topology.disrupt(randomFrom(partition1), randomFrom(partition1))); + assertFalse(topology.disrupt(randomFrom(partition2), randomFrom(partition2))); + assertFalse(topology.disrupt(randomAsciiOfLength(10), randomFrom(partition1))); + assertFalse(topology.disrupt(randomAsciiOfLength(10), randomFrom(partition2))); + assertFalse(topology.disrupt(randomFrom(partition1), randomAsciiOfLength(10))); + assertFalse(topology.disrupt(randomFrom(partition2), randomAsciiOfLength(10))); + } + assertTrue(topology.getMajoritySide().size() >= topology.getMinoritySide().size()); + } + + public void testBridge() { + Set partition1 = generateRandomStringSet(1, 10); + Set partition2 = generateRandomStringSet(1, 10); + String bridgeNode = randomAsciiOfLength(10); + Bridge topology = new Bridge(bridgeNode, partition1, partition2); + checkBridge(topology, bridgeNode, partition1, partition2); + } + + public void testRandomBridge() { + Bridge topology = Bridge.random(random(), generateRandomStringSet(3, 20)); + String bridgeNode = topology.getBridgeNode(); + Set partition1 = topology.getNodesSideOne(); + Set partition2 = topology.getNodesSideTwo(); + checkBridge(topology, bridgeNode, partition1, partition2); + } + + private void checkBridge(Bridge topology, String bridgeNode, Set partition1, Set partition2) { + for (int i = 0; i < 10; i++) { + assertTrue(topology.disrupt(randomFrom(partition1), randomFrom(partition2))); + assertTrue(topology.disrupt(randomFrom(partition2), randomFrom(partition1))); + assertFalse(topology.disrupt(randomFrom(partition1), randomFrom(partition1))); + assertFalse(topology.disrupt(randomFrom(partition1), bridgeNode)); + assertFalse(topology.disrupt(bridgeNode, randomFrom(partition1))); + assertFalse(topology.disrupt(randomFrom(partition2), randomFrom(partition2))); + assertFalse(topology.disrupt(randomFrom(partition2), bridgeNode)); + assertFalse(topology.disrupt(bridgeNode, randomFrom(partition2))); + assertFalse(topology.disrupt(randomAsciiOfLength(10), randomFrom(partition1))); + assertFalse(topology.disrupt(randomAsciiOfLength(10), randomFrom(partition2))); + assertFalse(topology.disrupt(randomAsciiOfLength(10), bridgeNode)); + assertFalse(topology.disrupt(randomFrom(partition1), randomAsciiOfLength(10))); + assertFalse(topology.disrupt(randomFrom(partition2), randomAsciiOfLength(10))); + assertFalse(topology.disrupt(bridgeNode, randomAsciiOfLength(10))); + } + } + + private Set generateRandomStringSet(int minSize, int maxSize) { + assert maxSize >= minSize; + Set result = new HashSet<>(); + for (int i = 0; i < minSize + randomInt(maxSize - minSize); i++) { + result.add(randomAsciiOfLength(10)); + } + return result; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartition.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartition.java deleted file mode 100644 index 9a65fc579f0..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartition.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test.disruption; - -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportService; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; - -import static org.junit.Assert.assertFalse; - -public abstract class NetworkPartition implements ServiceDisruptionScheme { - - protected final ESLogger logger = Loggers.getLogger(getClass()); - - final Set nodesSideOne; - final Set nodesSideTwo; - volatile boolean autoExpand; - protected final Random random; - protected volatile InternalTestCluster cluster; - protected volatile boolean activeDisruption = false; - - - public NetworkPartition(Random random) { - this.random = new Random(random.nextLong()); - nodesSideOne = new HashSet<>(); - nodesSideTwo = new HashSet<>(); - autoExpand = true; - } - - public NetworkPartition(String node1, String node2, Random random) { - this(random); - nodesSideOne.add(node1); - nodesSideTwo.add(node2); - autoExpand = false; - } - - public NetworkPartition(Set nodesSideOne, Set nodesSideTwo, Random random) { - this(random); - this.nodesSideOne.addAll(nodesSideOne); - this.nodesSideTwo.addAll(nodesSideTwo); - autoExpand = false; - } - - - public Collection getNodesSideOne() { - return Collections.unmodifiableCollection(nodesSideOne); - } - - public Collection getNodesSideTwo() { - return Collections.unmodifiableCollection(nodesSideTwo); - } - - public Collection getMajoritySide() { - if (nodesSideOne.size() >= nodesSideTwo.size()) { - return getNodesSideOne(); - } else { - return getNodesSideTwo(); - } - } - - public Collection getMinoritySide() { - if (nodesSideOne.size() >= nodesSideTwo.size()) { - return getNodesSideTwo(); - } else { - return getNodesSideOne(); - } - } - - @Override - public void applyToCluster(InternalTestCluster cluster) { - this.cluster = cluster; - if (autoExpand) { - for (String node : cluster.getNodeNames()) { - applyToNode(node, cluster); - } - } - } - - @Override - public void removeFromCluster(InternalTestCluster cluster) { - stopDisrupting(); - } - - @Override - public void removeAndEnsureHealthy(InternalTestCluster cluster) { - removeFromCluster(cluster); - ensureNodeCount(cluster); - } - - protected void ensureNodeCount(InternalTestCluster cluster) { - assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth() - .setWaitForNodes("" + cluster.size()) - .setWaitForRelocatingShards(0) - .get().isTimedOut()); - } - - @Override - public synchronized void applyToNode(String node, InternalTestCluster cluster) { - if (!autoExpand || nodesSideOne.contains(node) || nodesSideTwo.contains(node)) { - return; - } - if (nodesSideOne.isEmpty()) { - nodesSideOne.add(node); - } else if (nodesSideTwo.isEmpty()) { - nodesSideTwo.add(node); - } else if (random.nextBoolean()) { - nodesSideOne.add(node); - } else { - nodesSideTwo.add(node); - } - } - - @Override - public synchronized void removeFromNode(String node, InternalTestCluster cluster) { - MockTransportService transportService = (MockTransportService) cluster.getInstance(TransportService.class, node); - Set otherSideNodes; - if (nodesSideOne.contains(node)) { - otherSideNodes = nodesSideTwo; - nodesSideOne.remove(node); - } else if (nodesSideTwo.contains(node)) { - otherSideNodes = nodesSideOne; - nodesSideTwo.remove(node); - } else { - return; - } - for (String node2 : otherSideNodes) { - MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); - removeDisruption(transportService, transportService2); - } - } - - @Override - public synchronized void testClusterClosed() { - - } - - protected abstract String getPartitionDescription(); - - @Override - public synchronized void startDisrupting() { - if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) { - return; - } - logger.info("nodes {} will be partitioned from {}. partition type [{}]", nodesSideOne, nodesSideTwo, getPartitionDescription()); - activeDisruption = true; - for (String node1 : nodesSideOne) { - MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1); - for (String node2 : nodesSideTwo) { - MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); - applyDisruption(transportService1, transportService2); - } - } - } - - - @Override - public synchronized void stopDisrupting() { - if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0 || !activeDisruption) { - return; - } - logger.info("restoring partition between nodes {} & nodes {}", nodesSideOne, nodesSideTwo); - for (String node1 : nodesSideOne) { - MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1); - for (String node2 : nodesSideTwo) { - MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); - removeDisruption(transportService1, transportService2); - } - } - activeDisruption = false; - } - - abstract void applyDisruption(MockTransportService transportService1, MockTransportService transportService2); - - - protected void removeDisruption(MockTransportService transportService1, MockTransportService transportService2) { - transportService1.clearRule(transportService2); - transportService2.clearRule(transportService1); - } - -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java deleted file mode 100644 index b69b7af3e5e..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test.disruption; - -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.transport.MockTransportService; - -import java.util.Random; -import java.util.Set; - -public class NetworkUnresponsivePartition extends NetworkPartition { - - public NetworkUnresponsivePartition(Random random) { - super(random); - } - - public NetworkUnresponsivePartition(String node1, String node2, Random random) { - super(node1, node2, random); - } - - public NetworkUnresponsivePartition(Set nodesSideOne, Set nodesSideTwo, Random random) { - super(nodesSideOne, nodesSideTwo, random); - } - - @Override - protected String getPartitionDescription() { - return "unresponsive"; - } - - @Override - void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) { - transportService1.addUnresponsiveRule(transportService2); - transportService2.addUnresponsiveRule(transportService1); - } - - @Override - public TimeValue expectedTimeToHeal() { - return TimeValue.timeValueSeconds(0); - } -}