From 58f8774fa25ed749490a3cdef63fc6c5e15ce8b6 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 11 Jun 2014 15:54:47 +0200 Subject: [PATCH] [Discovery] do not use versions to optimize cluster state copying for a first update from a new master We have an optimization which compares routing/meta data version of cluster states and tries to reuse the current object if the versions are equal. This can cause rare failures during recovery from a minimum_master_node breach when using the "new light rejoin" mechanism and simulated network disconnects. This happens where the current master updates it's state, doesn't manage to broadcast it to other nodes due to the disconnect and then steps down. The new master will start with a previous version and continue to update it. When the old master rejoins, the versions of it's state can equal but the content is different. Also improved DiscoveryWithNetworkFailuresTests to simulate this failure (and other improvements) Closes #6466 --- .../service/InternalClusterService.java | 14 - .../discovery/DiscoverySettings.java | 1 + .../discovery/local/LocalDiscovery.java | 13 +- .../discovery/zen/ZenDiscovery.java | 18 +- .../DiscoveryWithNetworkFailuresTests.java | 241 ++++++++++++------ .../test/ElasticsearchIntegrationTest.java | 7 + 6 files changed, 193 insertions(+), 101 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index dbe0b4c7ad0..ff6f3924253 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -386,20 +386,6 @@ public class InternalClusterService extends AbstractLifecycleComponent implem private final TransportService transportService; private final ClusterService clusterService; + private final DiscoveryService discoveryService; private final DiscoveryNodeService discoveryNodeService; private AllocationService allocationService; private final ClusterName clusterName; @@ -77,7 +78,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Inject public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService, - DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) { + DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings, DiscoveryService discoveryService) { super(settings); this.clusterName = clusterName; this.clusterService = clusterService; @@ -85,6 +86,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem this.discoveryNodeService = discoveryNodeService; this.version = version; this.discoverySettings = discoverySettings; + this.discoveryService = discoveryService; } @Override @@ -305,6 +307,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); // ignore cluster state messages that do not include "me", not in the game yet... if (nodeSpecificClusterState.nodes().localNode() != null) { + assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master"; + assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block"; + discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -312,6 +317,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem return currentState; } + if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) { + // its a fresh update from the master as we transition from a start of not having a master to having one + logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId()); + return nodeSpecificClusterState; + } + ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState); // if the routing table did not change, use the original one if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 8ccd5046e24..1f0b365aee3 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -85,6 +85,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final ClusterService clusterService; private AllocationService allocationService; private final ClusterName clusterName; + private final DiscoveryService discoveryService; private final DiscoveryNodeService discoveryNodeService; private final DiscoverySettings discoverySettings; private final ZenPingService pingService; @@ -128,12 +129,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService, - DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) { + DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings, + DiscoveryService discoveryService) { super(settings); this.clusterName = clusterName; this.threadPool = threadPool; this.clusterService = clusterService; this.transportService = transportService; + this.discoveryService = discoveryService; this.discoveryNodeService = discoveryNodeService; this.discoverySettings = discoverySettings; this.pingService = pingService; @@ -641,6 +644,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed); processNewClusterStates.add(processClusterState); + + assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master"; + assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block"; + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -701,7 +708,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); } + if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) { + // its a fresh update from the master as we transition from a start of not having a master to having one + logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId()); + return updatedState; + } + + + // some optimizations to make sure we keep old objects where possible ClusterState.Builder builder = ClusterState.builder(updatedState); + // if the routing table did not change, use the original one if (updatedState.routingTable().version() == currentState.routingTable().version()) { builder.routingTable(currentState.routingTable()); diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java index 1d6a346dbd9..905b45a6595 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java @@ -35,10 +35,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; @@ -47,9 +47,10 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; -import org.junit.Ignore; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -61,13 +62,14 @@ import static org.hamcrest.Matchers.*; /** */ -@ClusterScope(scope= Scope.TEST, numDataNodes =0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationTest { private static final Settings nodeSettings = ImmutableSettings.settingsBuilder() .put("discovery.type", "zen") // <-- To override the local setting if set externally .put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly .put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly + .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put("discovery.zen.minimum_master_nodes", 2) .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) .build(); @@ -97,12 +99,8 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - // Wait until a green status has been reaches and 3 nodes are part of the cluster - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + // Wait until 3 nodes are part of the cluster + ensureStableCluster(3); // Figure out what is the elected master node DiscoveryNode masterDiscoNode = findMasterNode(nodes); @@ -155,11 +153,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } // Wait until the master node sees all 3 nodes again. - clusterHealthResponse = masterClient.admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + ensureStableCluster(3); for (String node : nodes) { ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); @@ -171,17 +165,12 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } @Test - @Ignore @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") public void testDataConsistency() throws Exception { List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - // Wait until a green status has been reaches and 3 nodes are part of the cluster - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + // Wait until a 3 nodes are part of the cluster + ensureStableCluster(3); assertAcked(prepareCreate("test") .addMapping("type", "field", "type=long") @@ -216,35 +205,29 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT ensureGreen("test"); // Pick a node that isn't the elected master. - String isolatedNode = nodes.get(0); - String nonIsolatedNode = nodes.get(1); - final Client nonIsolatedNodeClient = internalCluster().client(nonIsolatedNode); + final String isolatedNode = nodes.get(0); + final String nonIsolatedNode = nodes.get(1); // Simulate a network issue between the unlucky node and the rest of the cluster. - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - addFailToSendNoConnectRule(nodeId, isolatedNode); - addFailToSendNoConnectRule(isolatedNode, nodeId); - } - } + randomIsolateNode(isolatedNode, nodes); try { - // Wait until elected master has removed that the unlucky node... + logger.info("wait until elected master has removed [{}]", isolatedNode); boolean applied = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - return nonIsolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; + return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; } }, 1, TimeUnit.MINUTES); assertThat(applied, is(true)); // The unlucky node must report *no* master node, since it can't connect to master and in fact it should // continuously ping until network failures have been resolved. However - final Client isolatedNodeClient = internalCluster().client(isolatedNode); // It may a take a bit before the node detects it has been cut off from the elected master + logger.info("waiting for isolated node [{}] to have no master", isolatedNode); applied = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState(); + ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState(); DiscoveryNodes localDiscoveryNodes = localClusterState.nodes(); logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint()); return localDiscoveryNodes.masterNode() == null; @@ -252,13 +235,14 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT }, 10, TimeUnit.SECONDS); assertThat(applied, is(true)); - ClusterHealthResponse healthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth("test") + ClusterHealthResponse healthResponse = client(nonIsolatedNode).admin().cluster().prepareHealth("test") .setWaitForYellowStatus().get(); assertThat(healthResponse.isTimedOut(), is(false)); assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // Reads on the right side of the split must work - searchResponse = nonIsolatedNodeClient.prepareSearch("test").setTypes("type") + logger.info("verifying healthy part of cluster returns data"); + searchResponse = client(nonIsolatedNode).prepareSearch("test").setTypes("type") .addSort("field", SortOrder.ASC) .get(); assertHitCount(searchResponse, indexRequests.length); @@ -269,20 +253,21 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } // Reads on the wrong side of the split are partial - searchResponse = isolatedNodeClient.prepareSearch("test").setTypes("type") - .addSort("field", SortOrder.ASC) + logger.info("verifying isolated node [{}] returns partial data", isolatedNode); + searchResponse = client(isolatedNode).prepareSearch("test").setTypes("type") + .addSort("field", SortOrder.ASC).setPreference("_only_local") .get(); assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards())); assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length)); - // Writes on the right side of the split must work - UpdateResponse updateResponse = nonIsolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2).get(); + logger.info("verifying writes on healthy cluster"); + UpdateResponse updateResponse = client(nonIsolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2).get(); assertThat(updateResponse.getVersion(), equalTo(2l)); - // Writes on the wrong side of the split fail try { - isolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2) - .setTimeout(TimeValue.timeValueSeconds(5)) // Fail quick, otherwise we wait 60 seconds. + logger.info("verifying writes on isolated [{}] fail", isolatedNode); + client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2) + .setTimeout("1s") // Fail quick, otherwise we wait 60 seconds. .get(); fail(); } catch (ClusterBlockException exception) { @@ -294,23 +279,13 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } finally { // stop simulating network failures, from this point on the unlucky node is able to rejoin // We also need to do this even if assertions fail, since otherwise the test framework can't work properly - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - clearNoConnectRule(nodeId, isolatedNode); - clearNoConnectRule(isolatedNode, nodeId); - } - } + restoreIsolation(isolatedNode, nodes); } // Wait until the master node sees all 3 nodes again. - clusterHealthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth() - .setWaitForGreenStatus() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + ensureStableCluster(3); + logger.info("verifying all nodes return all data"); for (Client client : clients()) { searchResponse = client.prepareSearch("test").setTypes("type") .addSort("field", SortOrder.ASC) @@ -334,16 +309,79 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } } + + @Test + @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") + public void voidIsolateMasterAndVerifyClusterStateConsensus() throws Exception { + final List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); + ensureStableCluster(3); + + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + )); + + ensureGreen(); + String isolatedNode = findMasterNode(nodes).name(); + String nonIsolatedNode = null; + for (String node : nodes) { + if (!node.equals(isolatedNode)) { + nonIsolatedNode = node; + break; + } + } + randomIsolateNode(isolatedNode, nodes); + + // make sure cluster reforms + ensureStableCluster(2, nonIsolatedNode); + + // restore isolation + restoreIsolation(isolatedNode, nodes); + + ensureStableCluster(3); + + logger.info("issue a reroute"); + // trigger a reroute now, instead of waiting for the background reroute of RerouteService + assertAcked(client().admin().cluster().prepareReroute()); + // and wait for it to finish. + assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).get().isTimedOut()); + + + // verify all cluster states are the same + ClusterState state = null; + for (String node : nodes) { + ClusterState nodeState = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); + if (state == null) { + state = nodeState; + continue; + } + // assert nodes are identical + try { + assertEquals("unequal versions", state.version(), nodeState.version()); + assertEquals("unequal node count", state.nodes().size(), nodeState.nodes().size()); + assertEquals("different masters ", state.nodes().masterNodeId(), nodeState.nodes().masterNodeId()); + assertEquals("different meta data version", state.metaData().version(), nodeState.metaData().version()); + if (!state.routingTable().prettyPrint().equals(nodeState.routingTable().prettyPrint())) { + fail("different routing"); + } + } catch (AssertionError t) { + fail("failed comparing cluster state: " + t.getMessage() + "\n" + + "--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state.prettyPrint() + + "\n--- cluster state [" + node + "]: ---\n" + nodeState.prettyPrint()); + } + + } + + } + + @Test - @Ignore @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") public void testRejoinDocumentExistsInAllShardCopies() throws Exception { - final List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); + ensureStableCluster(3); + assertAcked(prepareCreate("test") .setSettings(ImmutableSettings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -352,23 +390,15 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT .get()); ensureGreen("test"); - String isolatedNode = findMasterNode(nodes).getName(); - String notIsolatedNode = null; - for (String node : nodes) { - if (!node.equals(isolatedNode)) { - notIsolatedNode = node; - break; - } - } + nodes = new ArrayList<>(nodes); + Collections.shuffle(nodes, getRandom()); + String isolatedNode = nodes.get(0); + String notIsolatedNode = nodes.get(1); + + randomIsolateNode(isolatedNode, nodes); + ensureStableCluster(2, notIsolatedNode); + assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut()); - logger.info("Isolating node[" + isolatedNode + "]"); - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - addFailToSendNoConnectRule(nodeId, isolatedNode); - addFailToSendNoConnectRule(isolatedNode, nodeId); - } - } - ensureYellow("test"); IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value").get(); assertThat(indexResponse.getVersion(), equalTo(1l)); @@ -381,13 +411,9 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT assertThat(getResponse.getVersion(), equalTo(1l)); assertThat(getResponse.getId(), equalTo(indexResponse.getId())); - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - clearNoConnectRule(nodeId, isolatedNode); - clearNoConnectRule(isolatedNode, nodeId); - } - } + restoreIsolation(isolatedNode, nodes); + ensureStableCluster(3); ensureGreen("test"); for (String node : nodes) { @@ -401,6 +427,32 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } } + protected void restoreIsolation(String isolatedNode, List nodes) { + logger.info("restoring isolation of [{}]", isolatedNode); + for (String nodeId : nodes) { + if (!nodeId.equals(isolatedNode)) { + clearNoConnectRule(nodeId, isolatedNode); + clearNoConnectRule(isolatedNode, nodeId); + } + } + } + + protected void randomIsolateNode(String isolatedNode, List nodes) { + boolean unresponsive = randomBoolean(); + logger.info("isolating [{}] with unresponsive: [{}]", isolatedNode, unresponsive); + for (String nodeId : nodes) { + if (!nodeId.equals(isolatedNode)) { + if (unresponsive) { + addUnresponsiveRule(nodeId, isolatedNode); + addUnresponsiveRule(isolatedNode, nodeId); + } else { + addFailToSendNoConnectRule(nodeId, isolatedNode); + addFailToSendNoConnectRule(isolatedNode, nodeId); + } + } + } + } + private DiscoveryNode findMasterNode(List nodes) { DiscoveryNode masterDiscoNode = null; for (String node : nodes) { @@ -421,9 +473,28 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT ((MockTransportService) mockTransportService).addFailToSendNoConnectRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); } + private void addUnresponsiveRule(String fromNode, String toNode) { + TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode); + ((MockTransportService) mockTransportService).addUnresponsiveRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); + } + private void clearNoConnectRule(String fromNode, String toNode) { TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode); ((MockTransportService) mockTransportService).clearRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); } + + private void ensureStableCluster(int nodeCount) { + ensureStableCluster(nodeCount, null); + } + + private void ensureStableCluster(int nodeCount, @Nullable String viaNode) { + ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCount)) + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealthResponse.isTimedOut(), is(false)); + } + } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 5cfceafa220..20789924ac9 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -646,6 +646,13 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } public static Client client() { + return client(null); + } + + public static Client client(@Nullable String node) { + if (node != null) { + return internalCluster().client(node); + } Client client = cluster().client(); if (frequently()) { client = new RandomizingClient(client, getRandom());