From 28489cee45370eaee0c0151e406b2ae157e47634 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 16 May 2014 22:09:39 +0200 Subject: [PATCH] [Tests] Added ServiceDisruptionScheme(s) and testAckedIndexing This commit adds the notion of ServiceDisruptionScheme allowing for introducing disruptions in our test cluster. This abstraction as used in a couple of wrappers around the functionality offered by MockTransportService to simulate various network partions. There is also one implementation for causing a node to be slow in processing cluster state updates. This new mechnaism is integrated into existing tests DiscoveryWithNetworkFailuresTests. A new test called testAckedIndexing is added to verify retrieval of documents whose indexing was acked during various disruptions. Closes #6505 --- .../discovery/zen/ZenDiscovery.java | 11 +- .../transport/TransportService.java | 4 + .../cluster/ClusterServiceTests.java | 6 +- .../cluster/NoMasterNodeTests.java | 7 +- .../DiscoveryWithNetworkFailuresTests.java | 399 ++++++++++++------ .../recovery/RecoveryWhileUnderLoadTests.java | 1 - .../elasticsearch/test/BackgroundIndexer.java | 2 +- .../test/ElasticsearchIntegrationTest.java | 11 + .../test/InternalTestCluster.java | 63 +++ .../org/elasticsearch/test/TestCluster.java | 1 + .../disruption/NetworkDelaysPartition.java | 88 ++++ .../NetworkDisconnectPartition.java | 53 +++ .../test/disruption/NetworkPartition.java | 199 +++++++++ .../NetworkUnresponsivePartition.java | 52 +++ .../test/disruption/NoOpDisruptionScheme.java | 60 +++ .../disruption/ServiceDisruptionScheme.java | 40 ++ .../test/disruption/SingleNodeDisruption.java | 83 ++++ .../SlowClusterStateProcessing.java | 130 ++++++ .../test/transport/MockTransportService.java | 100 ++++- 19 files changed, 1149 insertions(+), 161 deletions(-) create mode 100644 src/test/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java create mode 100644 src/test/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java create mode 100644 src/test/java/org/elasticsearch/test/disruption/NetworkPartition.java create mode 100644 src/test/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java create mode 100644 src/test/java/org/elasticsearch/test/disruption/NoOpDisruptionScheme.java create mode 100644 src/test/java/org/elasticsearch/test/disruption/ServiceDisruptionScheme.java create mode 100644 src/test/java/org/elasticsearch/test/disruption/SingleNodeDisruption.java create mode 100644 src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index c1ca890df89..bccc2749656 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -342,7 +342,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Override public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); + logger.error("unexpected failure during [{}]", t, source); } @Override @@ -408,8 +408,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen public void onFailure(String source, Throwable t) { if (t instanceof ClusterService.NoLongerMasterException) { logger.debug("not processing {} leave request as we are no longer master", node); - } - else { + } else { logger.error("unexpected failure during [{}]", t, source); } } @@ -448,8 +447,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen public void onFailure(String source, Throwable t) { if (t instanceof ClusterService.NoLongerMasterException) { logger.debug("not processing [{}] as we are no longer master", source); - } - else { + } else { logger.error("unexpected failure during [{}]", t, source); } } @@ -486,8 +484,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen public void onFailure(String source, Throwable t) { if (t instanceof ClusterService.NoLongerMasterException) { logger.debug("not processing [{}] as we are no longer master", source); - } - else { + } else { logger.error("unexpected failure during [{}]", t, source); } } diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index e922f1b4932..e2e6f502e89 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -245,6 +245,10 @@ public class TransportService extends AbstractLifecycleComponent master = cluster().startNodeAsync(settings); - ListenableFuture nonMaster = cluster().startNodeAsync(settingsBuilder().put(settings).put("node.master", false).build()); + ListenableFuture master = internalCluster().startNodeAsync(settings); + ListenableFuture nonMaster = internalCluster().startNodeAsync(settingsBuilder().put(settings).put("node.master", false).build()); master.get(); ensureGreen(); // make sure we have a cluster - ClusterService clusterService = cluster().getInstance(ClusterService.class, nonMaster.get()); + ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nonMaster.get()); final boolean[] taskFailed = {false}; final CountDownLatch latch1 = new CountDownLatch(1); diff --git a/src/test/java/org/elasticsearch/cluster/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/cluster/NoMasterNodeTests.java index fa1ca5e9e80..94c0268cdd6 100644 --- a/src/test/java/org/elasticsearch/cluster/NoMasterNodeTests.java +++ b/src/test/java/org/elasticsearch/cluster/NoMasterNodeTests.java @@ -21,9 +21,9 @@ package org.elasticsearch.cluster; import org.elasticsearch.action.ActionRequestBuilder; import com.google.common.base.Predicate; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.percolate.PercolateSourceBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -48,11 +48,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertExis import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.*; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.*; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertExists; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; /** */ diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java index 07371274cad..a0abf9fdd91 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.discovery; import com.google.common.base.Predicate; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -41,16 +43,20 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.disruption.*; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; -import org.elasticsearch.transport.TransportService; import org.junit.Test; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; @@ -108,38 +114,36 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT assert unluckyNode != null; // Simulate a network issue between the unlucky node and elected master node in both directions. - addFailToSendNoConnectRule(masterDiscoNode.getName(), unluckyNode); - addFailToSendNoConnectRule(unluckyNode, masterDiscoNode.getName()); - try { - // Wait until elected master has removed that the unlucky node... - boolean applied = awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - return masterClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; - } - }, 1, TimeUnit.MINUTES); - assertThat(applied, is(true)); - // The unlucky node must report *no* master node, since it can't connect to master and in fact it should - // continuously ping until network failures have been resolved. However - final Client isolatedNodeClient = internalCluster().client(unluckyNode); - // It may a take a bit before the node detects it has been cut off from the elected master - applied = awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState(); - DiscoveryNodes localDiscoveryNodes = localClusterState.nodes(); - logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint()); - return localDiscoveryNodes.masterNode() == null; - } - }, 10, TimeUnit.SECONDS); - assertThat(applied, is(true)); - } finally { - // stop simulating network failures, from this point on the unlucky node is able to rejoin - // We also need to do this even if assertions fail, since otherwise the test framework can't work properly - clearNoConnectRule(masterDiscoNode.getName(), unluckyNode); - clearNoConnectRule(unluckyNode, masterDiscoNode.getName()); - } + NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(masterDiscoNode.name(), unluckyNode, getRandom()); + setDisruptionScheme(networkDisconnect); + networkDisconnect.startDisrupting(); + + // Wait until elected master has removed that the unlucky node... + boolean applied = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return masterClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; + } + }, 1, TimeUnit.MINUTES); + assertThat(applied, is(true)); + + // The unlucky node must report *no* master node, since it can't connect to master and in fact it should + // continuously ping until network failures have been resolved. However + final Client isolatedNodeClient = internalCluster().client(unluckyNode); + // It may a take a bit before the node detects it has been cut off from the elected master + applied = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState(); + DiscoveryNodes localDiscoveryNodes = localClusterState.nodes(); + logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint()); + return localDiscoveryNodes.masterNode() == null; + } + }, 10, TimeUnit.SECONDS); + assertThat(applied, is(true)); + + networkDisconnect.stopDisrupting(); // Wait until the master node sees all 3 nodes again. ensureStableCluster(3); @@ -193,80 +197,78 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT // (waiting for green here, because indexing / search in a yellow index is fine as long as no other nodes go down) ensureGreen("test"); - // Pick a node that isn't the elected master. - final String isolatedNode = nodes.get(0); - final String nonIsolatedNode = nodes.get(1); + NetworkPartition networkPartition = addRandomPartition(); + + final String isolatedNode = networkPartition.getMinoritySide().get(0); + final String nonIsolatedNode = networkPartition.getMjaoritySide().get(0); // Simulate a network issue between the unlucky node and the rest of the cluster. - randomIsolateNode(isolatedNode, nodes); - try { - logger.info("wait until elected master has removed [{}]", isolatedNode); - boolean applied = awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; - } - }, 1, TimeUnit.MINUTES); - assertThat(applied, is(true)); + networkPartition.startDisrupting(); - // The unlucky node must report *no* master node, since it can't connect to master and in fact it should - // continuously ping until network failures have been resolved. However - // It may a take a bit before the node detects it has been cut off from the elected master - logger.info("waiting for isolated node [{}] to have no master", isolatedNode); - applied = awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState(); - DiscoveryNodes localDiscoveryNodes = localClusterState.nodes(); - logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint()); - return localDiscoveryNodes.masterNode() == null; - } - }, 10, TimeUnit.SECONDS); - assertThat(applied, is(true)); - ensureStableCluster(2, nonIsolatedNode); - - // Reads on the right side of the split must work - logger.info("verifying healthy part of cluster returns data"); - searchResponse = client(nonIsolatedNode).prepareSearch("test").setTypes("type") - .addSort("field", SortOrder.ASC) - .get(); - assertHitCount(searchResponse, indexRequests.length); - for (int i = 0; i < searchResponse.getHits().getHits().length; i++) { - SearchHit searchHit = searchResponse.getHits().getAt(i); - assertThat(searchHit.id(), equalTo(String.valueOf(i))); - assertThat((long) searchHit.sortValues()[0], equalTo((long) i)); + logger.info("wait until elected master has removed [{}]", isolatedNode); + boolean applied = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; } + }, 1, TimeUnit.MINUTES); + assertThat(applied, is(true)); - // Reads on the wrong side of the split are partial - logger.info("verifying isolated node [{}] returns partial data", isolatedNode); - searchResponse = client(isolatedNode).prepareSearch("test").setTypes("type") - .addSort("field", SortOrder.ASC).setPreference("_only_local") - .get(); - assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards())); - assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length)); - - logger.info("verifying writes on healthy cluster"); - UpdateResponse updateResponse = client(nonIsolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2).get(); - assertThat(updateResponse.getVersion(), equalTo(2l)); - - try { - logger.info("verifying writes on isolated [{}] fail", isolatedNode); - client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2) - .setTimeout("1s") // Fail quick, otherwise we wait 60 seconds. - .get(); - fail(); - } catch (ClusterBlockException exception) { - assertThat(exception.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(exception.blocks().size(), equalTo(1)); - ClusterBlock clusterBlock = exception.blocks().iterator().next(); - assertThat(clusterBlock.id(), equalTo(DiscoverySettings.NO_MASTER_BLOCK_ID)); + // The unlucky node must report *no* master node, since it can't connect to master and in fact it should + // continuously ping until network failures have been resolved. However + // It may a take a bit before the node detects it has been cut off from the elected master + logger.info("waiting for isolated node [{}] to have no master", isolatedNode); + applied = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState(); + DiscoveryNodes localDiscoveryNodes = localClusterState.nodes(); + logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint()); + return localDiscoveryNodes.masterNode() == null; } - } finally { - // stop simulating network failures, from this point on the unlucky node is able to rejoin - // We also need to do this even if assertions fail, since otherwise the test framework can't work properly - restoreIsolation(isolatedNode, nodes); + }, 10, TimeUnit.SECONDS); + assertThat(applied, is(true)); + ensureStableCluster(2, nonIsolatedNode); + + // Reads on the right side of the split must work + logger.info("verifying healthy part of cluster returns data"); + searchResponse = client(nonIsolatedNode).prepareSearch("test").setTypes("type") + .addSort("field", SortOrder.ASC) + .get(); + assertHitCount(searchResponse, indexRequests.length); + for (int i = 0; i < searchResponse.getHits().getHits().length; i++) { + SearchHit searchHit = searchResponse.getHits().getAt(i); + assertThat(searchHit.id(), equalTo(String.valueOf(i))); + assertThat((long) searchHit.sortValues()[0], equalTo((long) i)); } + // Reads on the wrong side of the split are partial + logger.info("verifying isolated node [{}] returns partial data", isolatedNode); + searchResponse = client(isolatedNode).prepareSearch("test").setTypes("type") + .addSort("field", SortOrder.ASC).setPreference("_only_local") + .get(); + assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards())); + assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length)); + + logger.info("verifying writes on healthy cluster"); + UpdateResponse updateResponse = client(nonIsolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2).get(); + assertThat(updateResponse.getVersion(), equalTo(2l)); + + try { + logger.info("verifying writes on isolated [{}] fail", isolatedNode); + client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2) + .setTimeout("1s") // Fail quick, otherwise we wait 60 seconds. + .get(); + fail(); + } catch (ClusterBlockException exception) { + assertThat(exception.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); + assertThat(exception.blocks().size(), equalTo(1)); + ClusterBlock clusterBlock = exception.blocks().iterator().next(); + assertThat(clusterBlock.id(), equalTo(DiscoverySettings.NO_MASTER_BLOCK_ID)); + } + + networkPartition.stopDisrupting(); + // Wait until the master node sees all 3 nodes again. ensureStableCluster(3); @@ -316,13 +318,14 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT break; } } - randomIsolateNode(isolatedNode, nodes); + ServiceDisruptionScheme scheme = addRandomIsolation(isolatedNode); + scheme.startDisrupting(); // make sure cluster reforms ensureStableCluster(2, nonIsolatedNode); // restore isolation - restoreIsolation(isolatedNode, nodes); + scheme.stopDisrupting(); ensureStableCluster(3); @@ -356,7 +359,120 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } } + } + @Test + @LuceneTestCase.AwaitsFix(bugUrl = "MvG will fix") + public void testAckedIndexing() throws Exception { + final List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); + ensureStableCluster(3); + + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + )); + + ensureGreen(); + + ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme(); + logger.info("disruption scheme [{}] added", disruptionScheme); + + final ConcurrentHashMap ackedDocs = new ConcurrentHashMap<>(); // id -> node sent. + + final AtomicBoolean stop = new AtomicBoolean(false); + List indexers = new ArrayList<>(nodes.size()); + List semaphores = new ArrayList<>(nodes.size()); + final AtomicInteger idGenerator = new AtomicInteger(0); + final AtomicReference countDownLatch = new AtomicReference<>(); + logger.info("starting indexers"); + + for (final String node : nodes) { + final Semaphore semaphore = new Semaphore(0); + semaphores.add(semaphore); + final Client client = client(node); + final String name = "indexer_" + indexers.size(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + while (!stop.get()) { + try { + if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { + continue; + } + try { + String id = Integer.toString(idGenerator.incrementAndGet()); + logger.trace("[{}] indexing id [{}] through node [{}]", name, id, node); + IndexResponse response = client.prepareIndex("test", "type", id).setSource("{}").setTimeout("1s").get(); + ackedDocs.put(id, node); + } finally { + countDownLatch.get().countDown(); + logger.trace("[{}] decreased counter : {}", name, countDownLatch.get().getCount()); + } + } catch (ElasticsearchException | InterruptedException e) { + // expected + } catch (Throwable t) { + logger.info("unexpected exception in background thread of [{}]", t, node); + } + } + } + }); + + thread.setName(name); + thread.setDaemon(true); + thread.start(); + indexers.add(thread); + } + + logger.info("indexing some docs before partition"); + int docsPerIndexer = randomInt(3); + countDownLatch.set(new CountDownLatch(docsPerIndexer * indexers.size())); + for (Semaphore semaphore : semaphores) { + semaphore.release(docsPerIndexer); + } + assertTrue(countDownLatch.get().await(1, TimeUnit.MINUTES)); + + for (int iter = 1 + randomInt(2); iter > 0; iter--) { + + logger.info("starting disruptions & indexing (iteration [{}])", iter); + disruptionScheme.startDisrupting(); + + docsPerIndexer = 1 + randomInt(5); + countDownLatch.set(new CountDownLatch(docsPerIndexer * indexers.size())); + Collections.shuffle(semaphores); + for (Semaphore semaphore : semaphores) { + semaphore.release(docsPerIndexer); + } + assertTrue(countDownLatch.get().await(1, TimeUnit.MINUTES)); + + logger.info("stopping disruption"); + disruptionScheme.stopDisrupting(); + + ensureStableCluster(3); + ensureGreen("test"); + + logger.info("validating successful docs"); + for (String node : nodes) { + try { + logger.debug("validating through node [{}]", node); + for (String id : ackedDocs.keySet()) { + assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found", + client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists()); + } + } catch (AssertionError e) { + throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e); + } + } + + logger.info("done validating (iteration [{}])", iter); + } + + logger.info("shutting down indexers"); + stop.set(true); + for (Thread indexer : indexers) { + indexer.interrupt(); + indexer.join(60000); + } } @@ -379,7 +495,8 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT String isolatedNode = nodes.get(0); String notIsolatedNode = nodes.get(1); - randomIsolateNode(isolatedNode, nodes); + ServiceDisruptionScheme scheme = addRandomIsolation(isolatedNode); + scheme.startDisrupting(); ensureStableCluster(2, notIsolatedNode); assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut()); @@ -395,7 +512,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT assertThat(getResponse.getVersion(), equalTo(1l)); assertThat(getResponse.getId(), equalTo(indexResponse.getId())); - restoreIsolation(isolatedNode, nodes); + scheme.stopDisrupting(); ensureStableCluster(3); ensureGreen("test"); @@ -411,30 +528,47 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } } - protected void restoreIsolation(String isolatedNode, List nodes) { - logger.info("restoring isolation of [{}]", isolatedNode); - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - clearNoConnectRule(nodeId, isolatedNode); - clearNoConnectRule(isolatedNode, nodeId); - } + protected NetworkPartition addRandomPartition() { + NetworkPartition partition; + if (randomBoolean()) { + partition = new NetworkUnresponsivePartition(getRandom()); + } else { + partition = new NetworkDisconnectPartition(getRandom()); } + + setDisruptionScheme(partition); + + return partition; } - protected void randomIsolateNode(String isolatedNode, List nodes) { - boolean unresponsive = randomBoolean(); - logger.info("isolating [{}] with unresponsive: [{}]", isolatedNode, unresponsive); - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - if (unresponsive) { - addUnresponsiveRule(nodeId, isolatedNode); - addUnresponsiveRule(isolatedNode, nodeId); - } else { - addFailToSendNoConnectRule(nodeId, isolatedNode); - addFailToSendNoConnectRule(isolatedNode, nodeId); - } - } + protected NetworkPartition addRandomIsolation(String isolatedNode) { + Set side1 = new HashSet<>(); + Set side2 = new HashSet<>(Arrays.asList(internalCluster().getNodeNames())); + side1.add(isolatedNode); + side2.remove(isolatedNode); + + NetworkPartition partition; + if (randomBoolean()) { + partition = new NetworkUnresponsivePartition(side1, side2, getRandom()); + } else { + partition = new NetworkDisconnectPartition(side1, side2, getRandom()); } + + internalCluster().setDisruptionScheme(partition); + + return partition; + } + + private ServiceDisruptionScheme addRandomDisruptionScheme() { + List list = Arrays.asList( + new NetworkUnresponsivePartition(getRandom()), + new NetworkDelaysPartition(getRandom()), + new NetworkDisconnectPartition(getRandom()), + new SlowClusterStateProcessing(getRandom()) + ); + Collections.shuffle(list); + setDisruptionScheme(list.get(0)); + return list.get(0); } private DiscoveryNode findMasterNode(List nodes) { @@ -452,21 +586,6 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT return masterDiscoNode; } - private void addFailToSendNoConnectRule(String fromNode, String toNode) { - TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode); - ((MockTransportService) mockTransportService).addFailToSendNoConnectRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); - } - - private void addUnresponsiveRule(String fromNode, String toNode) { - TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode); - ((MockTransportService) mockTransportService).addUnresponsiveRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); - } - - private void clearNoConnectRule(String fromNode, String toNode) { - TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode); - ((MockTransportService) mockTransportService).clearRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); - } - private void ensureStableCluster(int nodeCount) { ensureStableCluster(nodeCount, null); diff --git a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java index ca2f8a5b050..ff4512b4113 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java @@ -43,7 +43,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; import static org.hamcrest.Matchers.equalTo; public class RecoveryWhileUnderLoadTests extends ElasticsearchIntegrationTest { diff --git a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java index 29184b89768..2cafcef5d9f 100644 --- a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java @@ -217,7 +217,7 @@ public class BackgroundIndexer implements AutoCloseable { setBudget(numOfDocs); } - /** Stop all background threads **/ + /** Stop all background threads * */ public void stop() throws InterruptedException { if (stop.get()) { return; diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 20789924ac9..5a59036ff41 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -97,6 +97,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; import org.elasticsearch.test.client.RandomizingClient; import org.hamcrest.Matchers; +import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.junit.*; import java.io.IOException; @@ -583,6 +584,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase boolean success = false; try { logger.info("[{}#{}]: cleaning up after test", getTestClass().getSimpleName(), getTestName()); + clearDisruptionScheme(); final Scope currentClusterScope = getCurrentClusterScope(); try { if (currentClusterScope != Scope.TEST) { @@ -696,6 +698,15 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase return between(minimumNumberOfReplicas(), maximumNumberOfReplicas()); } + + public void setDisruptionScheme(ServiceDisruptionScheme scheme) { + internalCluster().setDisruptionScheme(scheme); + } + + public void clearDisruptionScheme() { + internalCluster().clearDisruptionScheme(); + } + /** * Returns a settings object used in {@link #createIndex(String...)} and {@link #prepareCreate(String)} and friends. * This method can be overwritten by subclasses to set defaults for the indices that are created by the test. diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index fdd345d1ab1..2ec3df22972 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -76,6 +76,7 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.search.SearchService; import org.elasticsearch.test.cache.recycler.MockBigArraysModule; import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule; +import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.engine.MockEngineModule; import org.elasticsearch.test.store.MockFSIndexStoreModule; import org.elasticsearch.test.transport.AssertingLocalTransport; @@ -185,6 +186,8 @@ public final class InternalTestCluster extends TestCluster { private final boolean hasFilterCache; + private ServiceDisruptionScheme activeDisruptionScheme; + public InternalTestCluster(long clusterSeed, String clusterName) { this(clusterSeed, DEFAULT_MIN_NUM_DATA_NODES, DEFAULT_MAX_NUM_DATA_NODES, clusterName, SettingsSource.EMPTY, DEFAULT_NUM_CLIENT_NODES, DEFAULT_ENABLE_RANDOM_BENCH_NODES); } @@ -288,6 +291,10 @@ public final class InternalTestCluster extends TestCluster { return clusterName; } + public String[] getNodeNames() { + return nodes.keySet().toArray(Strings.EMPTY_ARRAY); + } + private static boolean isLocalTransportConfigured() { if ("local".equals(System.getProperty("es.node.mode", "network"))) { return true; @@ -486,6 +493,7 @@ public final class InternalTestCluster extends TestCluster { while (limit.hasNext()) { NodeAndClient next = limit.next(); nodesToRemove.add(next); + removeDistruptionSchemeFromNode(next); next.close(); } for (NodeAndClient toRemove : nodesToRemove) { @@ -660,6 +668,10 @@ public final class InternalTestCluster extends TestCluster { @Override public void close() { if (this.open.compareAndSet(true, false)) { + if (activeDisruptionScheme != null) { + activeDisruptionScheme.testClusterClosed(); + activeDisruptionScheme = null; + } IOUtils.closeWhileHandlingException(nodes.values()); nodes.clear(); executor.shutdownNow(); @@ -824,6 +836,7 @@ public final class InternalTestCluster extends TestCluster { } private synchronized void reset(boolean wipeData) throws IOException { + clearDisruptionScheme(); randomlyResetClients(); if (wipeData) { wipeDataDirectories(); @@ -1023,6 +1036,7 @@ public final class InternalTestCluster extends TestCluster { NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate()); if (nodeAndClient != null) { logger.info("Closing random node [{}] ", nodeAndClient.name); + removeDistruptionSchemeFromNode(nodeAndClient); nodes.remove(nodeAndClient.name); nodeAndClient.close(); } @@ -1042,6 +1056,7 @@ public final class InternalTestCluster extends TestCluster { }); if (nodeAndClient != null) { logger.info("Closing filtered random node [{}] ", nodeAndClient.name); + removeDistruptionSchemeFromNode(nodeAndClient); nodes.remove(nodeAndClient.name); nodeAndClient.close(); } @@ -1056,6 +1071,7 @@ public final class InternalTestCluster extends TestCluster { String masterNodeName = getMasterName(); assert nodes.containsKey(masterNodeName); logger.info("Closing master node [{}] ", masterNodeName); + removeDistruptionSchemeFromNode(nodes.get(masterNodeName)); NodeAndClient remove = nodes.remove(masterNodeName); remove.close(); } @@ -1067,6 +1083,7 @@ public final class InternalTestCluster extends TestCluster { NodeAndClient nodeAndClient = getRandomNodeAndClient(Predicates.not(new MasterNodePredicate(getMasterName()))); if (nodeAndClient != null) { logger.info("Closing random non master node [{}] current master [{}] ", nodeAndClient.name, getMasterName()); + removeDistruptionSchemeFromNode(nodeAndClient); nodes.remove(nodeAndClient.name); nodeAndClient.close(); } @@ -1120,6 +1137,9 @@ public final class InternalTestCluster extends TestCluster { if (!callback.doRestart(nodeAndClient.name)) { logger.info("Closing node [{}] during restart", nodeAndClient.name); toRemove.add(nodeAndClient); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); + } nodeAndClient.close(); } } @@ -1134,18 +1154,33 @@ public final class InternalTestCluster extends TestCluster { for (NodeAndClient nodeAndClient : nodes.values()) { callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); logger.info("Restarting node [{}] ", nodeAndClient.name); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); + } nodeAndClient.restart(callback); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.applyToNode(nodeAndClient.name, this); + } } } else { int numNodesRestarted = 0; for (NodeAndClient nodeAndClient : nodes.values()) { callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient()); logger.info("Stopping node [{}] ", nodeAndClient.name); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); + } nodeAndClient.node.close(); } for (NodeAndClient nodeAndClient : nodes.values()) { logger.info("Starting node [{}] ", nodeAndClient.name); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); + } nodeAndClient.restart(callback); + if (activeDisruptionScheme != null) { + activeDisruptionScheme.applyToNode(nodeAndClient.name, this); + } } } } @@ -1343,6 +1378,7 @@ public final class InternalTestCluster extends TestCluster { dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataLocations())); } nodes.put(nodeAndClient.name, nodeAndClient); + applyDisruptionSchemeToNode(nodeAndClient); } public void closeNonSharedNodes(boolean wipeData) throws IOException { @@ -1364,6 +1400,33 @@ public final class InternalTestCluster extends TestCluster { return hasFilterCache; } + public void setDisruptionScheme(ServiceDisruptionScheme scheme) { + clearDisruptionScheme(); + scheme.applyToCluster(this); + activeDisruptionScheme = scheme; + } + + public void clearDisruptionScheme() { + if (activeDisruptionScheme != null) { + activeDisruptionScheme.removeFromCluster(this); + } + activeDisruptionScheme = null; + } + + private void applyDisruptionSchemeToNode(NodeAndClient nodeAndClient) { + if (activeDisruptionScheme != null) { + assert nodes.containsKey(nodeAndClient.name); + activeDisruptionScheme.applyToNode(nodeAndClient.name, this); + } + } + + private void removeDistruptionSchemeFromNode(NodeAndClient nodeAndClient) { + if (activeDisruptionScheme != null) { + assert nodes.containsKey(nodeAndClient.name); + activeDisruptionScheme.removeFromNode(nodeAndClient.name, this); + } + } + private synchronized Collection dataNodeAndClients() { return Collections2.filter(nodes.values(), new DataNodePredicate()); } diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index 68560e52e92..deb65ca22fc 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.indices.IndexMissingException; diff --git a/src/test/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java b/src/test/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java new file mode 100644 index 00000000000..65dbc056130 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java @@ -0,0 +1,88 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.disruption; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.transport.MockTransportService; + +import java.util.Random; +import java.util.Set; + +public class NetworkDelaysPartition extends NetworkPartition { + + static long DEFAULT_DELAY_MIN = 10000; + static long DEFAULT_DELAY_MAX = 90000; + + + final long delayMin; + final long delayMax; + + TimeValue duration; + + public NetworkDelaysPartition(Random random) { + this(random, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX); + } + + public NetworkDelaysPartition(Random random, long delayMin, long delayMax) { + super(random); + this.delayMin = delayMin; + this.delayMax = delayMax; + } + + public NetworkDelaysPartition(String node1, String node2, Random random) { + this(node1, node2, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random); + } + + public NetworkDelaysPartition(String node1, String node2, long delayMin, long delayMax, Random random) { + super(node1, node2, random); + this.delayMin = delayMin; + this.delayMax = delayMax; + } + + public NetworkDelaysPartition(Set nodesSideOne, Set nodesSideTwo, Random random) { + this(nodesSideOne, nodesSideTwo, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random); + } + + public NetworkDelaysPartition(Set nodesSideOne, Set nodesSideTwo, long delayMin, long delayMax, Random random) { + super(nodesSideOne, nodesSideTwo, random); + this.delayMin = delayMin; + this.delayMax = delayMax; + + } + + @Override + public synchronized void startDisrupting() { + duration = new TimeValue(delayMin + random.nextInt((int) (delayMax - delayMin))); + super.startDisrupting(); + } + + @Override + void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, + DiscoveryNode node2, MockTransportService transportService2) { + transportService1.addUnresponsiveRule(node1, duration); + transportService1.addUnresponsiveRule(node2, duration); + } + + @Override + protected String getPartitionDescription() { + return "network delays for [" + duration + "]"; + } + +} diff --git a/src/test/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java b/src/test/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java new file mode 100644 index 00000000000..664c7a09977 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.disruption; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.test.transport.MockTransportService; + +import java.util.Random; +import java.util.Set; + +public class NetworkDisconnectPartition extends NetworkPartition { + + + public NetworkDisconnectPartition(Random random) { + super(random); + } + + public NetworkDisconnectPartition(String node1, String node2, Random random) { + super(node1, node2, random); + } + + public NetworkDisconnectPartition(Set nodesSideOne, Set nodesSideTwo, Random random) { + super(nodesSideOne, nodesSideTwo, random); + } + + @Override + protected String getPartitionDescription() { + return "disconnected"; + } + + @Override + void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, + DiscoveryNode node2, MockTransportService transportService2) { + transportService1.addFailToSendNoConnectRule(node2); + transportService2.addFailToSendNoConnectRule(node1); + } +} diff --git a/src/test/java/org/elasticsearch/test/disruption/NetworkPartition.java b/src/test/java/org/elasticsearch/test/disruption/NetworkPartition.java new file mode 100644 index 00000000000..c8953fad593 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/disruption/NetworkPartition.java @@ -0,0 +1,199 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.disruption; + +import com.google.common.collect.ImmutableList; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.TestCluster; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; + +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +public abstract class NetworkPartition implements ServiceDisruptionScheme { + + protected final ESLogger logger = Loggers.getLogger(getClass()); + + final Set nodesSideOne; + final Set nodesSideTwo; + volatile boolean autoExpand; + protected final Random random; + protected volatile InternalTestCluster cluster; + + + public NetworkPartition(Random random) { + this.random = new Random(random.nextLong()); + nodesSideOne = new HashSet<>(); + nodesSideTwo = new HashSet<>(); + autoExpand = true; + } + + public NetworkPartition(String node1, String node2, Random random) { + this(random); + nodesSideOne.add(node1); + nodesSideTwo.add(node2); + autoExpand = false; + } + + public NetworkPartition(Set nodesSideOne, Set nodesSideTwo, Random random) { + this(random); + this.nodesSideOne.addAll(nodesSideOne); + this.nodesSideTwo.addAll(nodesSideTwo); + autoExpand = false; + } + + + public List getNodesSideOne() { + return ImmutableList.copyOf(nodesSideOne); + } + + public List getNodesSideTwo() { + return ImmutableList.copyOf(nodesSideTwo); + } + + public List getMjaoritySide() { + if (nodesSideOne.size() >= nodesSideTwo.size()) { + return getNodesSideOne(); + } else { + return getNodesSideTwo(); + } + } + + public List getMinoritySide() { + if (nodesSideOne.size() >= nodesSideTwo.size()) { + return getNodesSideTwo(); + } else { + return getNodesSideOne(); + } + } + + @Override + public void applyToCluster(InternalTestCluster cluster) { + this.cluster = cluster; + if (autoExpand) { + for (String node : cluster.getNodeNames()) { + applyToNode(node, cluster); + } + } + } + + @Override + public void removeFromCluster(InternalTestCluster cluster) { + stopDisrupting(); + } + + @Override + public synchronized void applyToNode(String node, InternalTestCluster cluster) { + if (!autoExpand || nodesSideOne.contains(node) || nodesSideTwo.contains(node)) { + return; + } + if (nodesSideOne.isEmpty()) { + nodesSideOne.add(node); + } else if (nodesSideTwo.isEmpty()) { + nodesSideTwo.add(node); + } else if (random.nextBoolean()) { + nodesSideOne.add(node); + } else { + nodesSideTwo.add(node); + } + } + + @Override + public synchronized void removeFromNode(String node, InternalTestCluster cluster) { + MockTransportService transportService = (MockTransportService) cluster.getInstance(TransportService.class, node); + DiscoveryNode discoveryNode = discoveryNode(node); + Set otherSideNodes; + if (nodesSideOne.contains(node)) { + otherSideNodes = nodesSideTwo; + } else if (nodesSideTwo.contains(node)) { + otherSideNodes = nodesSideOne; + } else { + return; + } + for (String node2 : otherSideNodes) { + MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); + DiscoveryNode discoveryNode2 = discoveryNode(node2); + removeDisruption(discoveryNode, transportService, discoveryNode2, transportService2); + } + } + + @Override + public synchronized void testClusterClosed() { + + } + + protected abstract String getPartitionDescription(); + + + protected DiscoveryNode discoveryNode(String node) { + return cluster.getInstance(Discovery.class, node).localNode(); + } + + @Override + public synchronized void startDisrupting() { + if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) { + return; + } + logger.info("nodes {} will be partitioned from {}. partition type [{}]", nodesSideOne, nodesSideTwo, getPartitionDescription()); + for (String node1 : nodesSideOne) { + MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1); + DiscoveryNode discoveryNode1 = discoveryNode(node1); + for (String node2 : nodesSideTwo) { + DiscoveryNode discoveryNode2 = discoveryNode(node2); + MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); + applyDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2); + } + } + } + + + @Override + public void stopDisrupting() { + if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) { + return; + } + logger.info("restoring partition between nodes {} & nodes {}", nodesSideOne, nodesSideTwo); + for (String node1 : nodesSideOne) { + MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1); + DiscoveryNode discoveryNode1 = discoveryNode(node1); + for (String node2 : nodesSideTwo) { + DiscoveryNode discoveryNode2 = discoveryNode(node2); + MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); + removeDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2); + } + } + } + + abstract void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, + DiscoveryNode node2, MockTransportService transportService2); + + + protected void removeDisruption(DiscoveryNode node1, MockTransportService transportService1, + DiscoveryNode node2, MockTransportService transportService2) { + transportService1.clearRule(node2); + transportService2.clearRule(node1); + } +} diff --git a/src/test/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java b/src/test/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java new file mode 100644 index 00000000000..95b853cf9b5 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.disruption; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.test.transport.MockTransportService; + +import java.util.Random; +import java.util.Set; + +public class NetworkUnresponsivePartition extends NetworkPartition { + + public NetworkUnresponsivePartition(Random random) { + super(random); + } + + public NetworkUnresponsivePartition(String node1, String node2, Random random) { + super(node1, node2, random); + } + + public NetworkUnresponsivePartition(Set nodesSideOne, Set nodesSideTwo, Random random) { + super(nodesSideOne, nodesSideTwo, random); + } + + @Override + protected String getPartitionDescription() { + return "unresponsive"; + } + + @Override + void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, + DiscoveryNode node2, MockTransportService transportService2) { + transportService1.addUnresponsiveRule(node2); + transportService2.addUnresponsiveRule(node1); + } +} diff --git a/src/test/java/org/elasticsearch/test/disruption/NoOpDisruptionScheme.java b/src/test/java/org/elasticsearch/test/disruption/NoOpDisruptionScheme.java new file mode 100644 index 00000000000..6ce11582904 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/disruption/NoOpDisruptionScheme.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.disruption; + +import org.elasticsearch.test.InternalTestCluster; + +public class NoOpDisruptionScheme implements ServiceDisruptionScheme { + + @Override + public void applyToCluster(InternalTestCluster cluster) { + + } + + @Override + public void removeFromCluster(InternalTestCluster cluster) { + + } + + @Override + public void applyToNode(String node, InternalTestCluster cluster) { + + } + + @Override + public void removeFromNode(String node, InternalTestCluster cluster) { + + } + + @Override + public void startDisrupting() { + + } + + @Override + public void stopDisrupting() { + + } + + @Override + public void testClusterClosed() { + + } +} diff --git a/src/test/java/org/elasticsearch/test/disruption/ServiceDisruptionScheme.java b/src/test/java/org/elasticsearch/test/disruption/ServiceDisruptionScheme.java new file mode 100644 index 00000000000..1290e387e12 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/disruption/ServiceDisruptionScheme.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.disruption; + +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.TestCluster; + +public interface ServiceDisruptionScheme { + + public void applyToCluster(InternalTestCluster cluster); + + public void removeFromCluster(InternalTestCluster cluster); + + public void applyToNode(String node, InternalTestCluster cluster); + + public void removeFromNode(String node, InternalTestCluster cluster); + + public void startDisrupting(); + + public void stopDisrupting(); + + public void testClusterClosed(); + +} diff --git a/src/test/java/org/elasticsearch/test/disruption/SingleNodeDisruption.java b/src/test/java/org/elasticsearch/test/disruption/SingleNodeDisruption.java new file mode 100644 index 00000000000..3148254011e --- /dev/null +++ b/src/test/java/org/elasticsearch/test/disruption/SingleNodeDisruption.java @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.disruption; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.test.InternalTestCluster; + +import java.util.Random; + +public abstract class SingleNodeDisruption implements ServiceDisruptionScheme { + + protected final ESLogger logger = Loggers.getLogger(getClass()); + + protected volatile String disruptedNode; + protected volatile InternalTestCluster cluster; + protected final Random random; + + + public SingleNodeDisruption(String disruptedNode, Random random) { + this(random); + this.disruptedNode = disruptedNode; + } + + public SingleNodeDisruption(Random random) { + this.random = new Random(random.nextLong()); + } + + @Override + public void applyToCluster(InternalTestCluster cluster) { + this.cluster = cluster; + if (disruptedNode == null) { + String[] nodes = cluster.getNodeNames(); + disruptedNode = nodes[random.nextInt(nodes.length)]; + } + } + + @Override + public void removeFromCluster(InternalTestCluster cluster) { + if (disruptedNode != null) { + removeFromNode(disruptedNode, cluster); + } + } + + @Override + public synchronized void applyToNode(String node, InternalTestCluster cluster) { + + } + + @Override + public synchronized void removeFromNode(String node, InternalTestCluster cluster) { + if (disruptedNode == null) { + return; + } + if (!node.equals(disruptedNode)) { + return; + } + stopDisrupting(); + disruptedNode = null; + } + + @Override + public synchronized void testClusterClosed() { + disruptedNode = null; + } + +} diff --git a/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java b/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java new file mode 100644 index 00000000000..6bfe5e7366a --- /dev/null +++ b/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.disruption; + +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.Random; + +public class SlowClusterStateProcessing extends SingleNodeDisruption { + + volatile boolean disrupting; + volatile Thread worker; + + final long intervalBetweenDelaysMin; + final long intervalBetweenDelaysMax; + final long delayDurationMin; + final long delayDurationMax; + + + public SlowClusterStateProcessing(Random random) { + this(null, random); + } + + public SlowClusterStateProcessing(String disruptedNode, Random random) { + this(disruptedNode, random, 100, 200, 300, 20000); + } + + public SlowClusterStateProcessing(String disruptedNode, Random random, long intervalBetweenDelaysMin, + long intervalBetweenDelaysMax, long delayDurationMin, long delayDurationMax) { + this(random, intervalBetweenDelaysMin, intervalBetweenDelaysMax, delayDurationMin, delayDurationMax); + this.disruptedNode = disruptedNode; + } + + public SlowClusterStateProcessing(Random random, + long intervalBetweenDelaysMin, long intervalBetweenDelaysMax, long delayDurationMin, + long delayDurationMax) { + super(random); + this.intervalBetweenDelaysMin = intervalBetweenDelaysMin; + this.intervalBetweenDelaysMax = intervalBetweenDelaysMax; + this.delayDurationMin = delayDurationMin; + this.delayDurationMax = delayDurationMax; + } + + + @Override + public void startDisrupting() { + disrupting = true; + worker = new Thread(new BackgroundWorker()); + worker.setDaemon(true); + worker.start(); + } + + @Override + public void stopDisrupting() { + disrupting = false; + try { + worker.join(2 * (intervalBetweenDelaysMax + delayDurationMax)); + } catch (InterruptedException e) { + logger.info("background thread failed to stop"); + } + worker = null; + } + + + private synchronized boolean interruptClusterStateProcessing(final TimeValue duration) { + if (disruptedNode == null) { + return false; + } + logger.info("delaying cluster state updates on node [{}] for [{}]", disruptedNode, duration); + ClusterService clusterService = cluster.getInstance(ClusterService.class, disruptedNode); + clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + Thread.sleep(duration.millis()); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + + } + }); + return true; + } + + class BackgroundWorker implements Runnable { + + @Override + public void run() { + while (disrupting) { + try { + TimeValue duration = new TimeValue(delayDurationMin + random.nextInt((int) (delayDurationMax - delayDurationMin))); + if (!interruptClusterStateProcessing(duration)) { + continue; + } + Thread.sleep(duration.millis()); + + if (disruptedNode == null) { + return; + } + + } catch (Exception e) { + logger.error("error in background worker", e); + } + } + } + } + +} diff --git a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java index 14f0296121e..5012384dbf0 100644 --- a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java @@ -24,9 +24,14 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -46,6 +51,7 @@ public class MockTransportService extends TransportService { public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) { super(settings, new LookupTestTransport(transport), threadPool); this.original = transport; + } /** @@ -97,7 +103,7 @@ public class MockTransportService extends TransportService { */ public void addFailToSendNoConnectRule(DiscoveryNode node, final Set blockedActions) { - ((LookupTestTransport) transport).transports.put(node.getAddress(), new DelegateTransport(original) { + addDelegate(node, new DelegateTransport(original) { @Override public void connectToNode(DiscoveryNode node) throws ConnectTransportException { original.connectToNode(node); @@ -124,7 +130,6 @@ public class MockTransportService extends TransportService { * and failing to connect once the rule was added. */ public void addUnresponsiveRule(DiscoveryNode node) { - // TODO add a parameter to delay the connect timeout? addDelegate(node, new DelegateTransport(original) { @Override public void connectToNode(DiscoveryNode node) throws ConnectTransportException { @@ -143,8 +148,98 @@ public class MockTransportService extends TransportService { }); } + /** + * Adds a rule that will cause ignores each send request, simulating an unresponsive node + * and failing to connect once the rule was added. + * + * @param duration the amount of time to delay sending and connecting. + */ + public void addUnresponsiveRule(DiscoveryNode node, final TimeValue duration) { + final long startTime = System.currentTimeMillis(); + + addDelegate(node, new DelegateTransport(original) { + + TimeValue getDelay() { + return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime)); + } + + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + TimeValue delay = getDelay(); + if (delay.millis() <= 0) { + original.connectToNode(node); + } + + // TODO: Replace with proper setting + TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_DEFAULT_CONNECT_TIMEOUT; + try { + if (delay.millis() < connectingTimeout.millis()) { + Thread.sleep(delay.millis()); + original.connectToNode(node); + } else { + Thread.sleep(connectingTimeout.millis()); + throw new ConnectTransportException(node, "UNRESPONSIVE: simulated"); + } + } catch (InterruptedException e) { + throw new ConnectTransportException(node, "UNRESPONSIVE: interrupted while sleeping", e); + } + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + TimeValue delay = getDelay(); + if (delay.millis() <= 0) { + original.connectToNodeLight(node); + } + + // TODO: Replace with proper setting + TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_DEFAULT_CONNECT_TIMEOUT; + try { + if (delay.millis() < connectingTimeout.millis()) { + Thread.sleep(delay.millis()); + original.connectToNodeLight(node); + } else { + Thread.sleep(connectingTimeout.millis()); + throw new ConnectTransportException(node, "UNRESPONSIVE: simulated"); + } + } catch (InterruptedException e) { + throw new ConnectTransportException(node, "UNRESPONSIVE: interrupted while sleeping", e); + } + } + + @Override + public void sendRequest(final DiscoveryNode node, final long requestId, final String action, TransportRequest request, final TransportRequestOptions options) throws IOException, TransportException { + // delayed sending - even if larger then the request timeout to simulated a potential late response from target node + + TimeValue delay = getDelay(); + if (delay.millis() <= 0) { + original.sendRequest(node, requestId, action, request, options); + } + + // poor mans request cloning... + TransportRequestHandler handler = MockTransportService.this.getHandler(action); + BytesStreamOutput bStream = new BytesStreamOutput(); + request.writeTo(bStream); + final TransportRequest clonedRequest = handler.newInstance(); + clonedRequest.readFrom(new BytesStreamInput(bStream.bytes())); + + threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() { + @Override + public void run() { + try { + original.sendRequest(node, requestId, action, clonedRequest, options); + } catch (Throwable e) { + logger.debug("failed to send delayed request", e); + } + } + }); + } + }); + } + /** * Adds a new delegate transport that is used for communication with the given node. + * * @return true iff no other delegate was registered for this node before, otherwise false */ public boolean addDelegate(DiscoveryNode node, DelegateTransport transport) { @@ -214,7 +309,6 @@ public class MockTransportService extends TransportService { } - @Override public void transportServiceAdapter(TransportServiceAdapter service) { transport.transportServiceAdapter(service);