diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index dbe0b4c7ad0..ff6f3924253 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -386,20 +386,6 @@ public class InternalClusterService extends AbstractLifecycleComponent implem private final TransportService transportService; private final ClusterService clusterService; + private final DiscoveryService discoveryService; private final DiscoveryNodeService discoveryNodeService; private AllocationService allocationService; private final ClusterName clusterName; @@ -77,7 +78,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem @Inject public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService, - DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) { + DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings, DiscoveryService discoveryService) { super(settings); this.clusterName = clusterName; this.clusterService = clusterService; @@ -85,6 +86,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem this.discoveryNodeService = discoveryNodeService; this.version = version; this.discoverySettings = discoverySettings; + this.discoveryService = discoveryService; } @Override @@ -305,6 +307,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); // ignore cluster state messages that do not include "me", not in the game yet... if (nodeSpecificClusterState.nodes().localNode() != null) { + assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master"; + assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block"; + discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -312,6 +317,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem return currentState; } + if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) { + // its a fresh update from the master as we transition from a start of not having a master to having one + logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId()); + return nodeSpecificClusterState; + } + ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState); // if the routing table did not change, use the original one if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 8ccd5046e24..1f0b365aee3 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -85,6 +85,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final ClusterService clusterService; private AllocationService allocationService; private final ClusterName clusterName; + private final DiscoveryService discoveryService; private final DiscoveryNodeService discoveryNodeService; private final DiscoverySettings discoverySettings; private final ZenPingService pingService; @@ -128,12 +129,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService, - DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) { + DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings, + DiscoveryService discoveryService) { super(settings); this.clusterName = clusterName; this.threadPool = threadPool; this.clusterService = clusterService; this.transportService = transportService; + this.discoveryService = discoveryService; this.discoveryNodeService = discoveryNodeService; this.discoverySettings = discoverySettings; this.pingService = pingService; @@ -641,6 +644,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed); processNewClusterStates.add(processClusterState); + + assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master"; + assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block"; + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -701,7 +708,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); } + if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) { + // its a fresh update from the master as we transition from a start of not having a master to having one + logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId()); + return updatedState; + } + + + // some optimizations to make sure we keep old objects where possible ClusterState.Builder builder = ClusterState.builder(updatedState); + // if the routing table did not change, use the original one if (updatedState.routingTable().version() == currentState.routingTable().version()) { builder.routingTable(currentState.routingTable()); diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java index 1d6a346dbd9..905b45a6595 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithNetworkFailuresTests.java @@ -35,10 +35,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; @@ -47,9 +47,10 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; -import org.junit.Ignore; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -61,13 +62,14 @@ import static org.hamcrest.Matchers.*; /** */ -@ClusterScope(scope= Scope.TEST, numDataNodes =0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationTest { private static final Settings nodeSettings = ImmutableSettings.settingsBuilder() .put("discovery.type", "zen") // <-- To override the local setting if set externally .put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly .put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly + .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put("discovery.zen.minimum_master_nodes", 2) .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) .build(); @@ -97,12 +99,8 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - // Wait until a green status has been reaches and 3 nodes are part of the cluster - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + // Wait until 3 nodes are part of the cluster + ensureStableCluster(3); // Figure out what is the elected master node DiscoveryNode masterDiscoNode = findMasterNode(nodes); @@ -155,11 +153,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } // Wait until the master node sees all 3 nodes again. - clusterHealthResponse = masterClient.admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + ensureStableCluster(3); for (String node : nodes) { ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); @@ -171,17 +165,12 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } @Test - @Ignore @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") public void testDataConsistency() throws Exception { List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - // Wait until a green status has been reaches and 3 nodes are part of the cluster - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + // Wait until a 3 nodes are part of the cluster + ensureStableCluster(3); assertAcked(prepareCreate("test") .addMapping("type", "field", "type=long") @@ -216,35 +205,29 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT ensureGreen("test"); // Pick a node that isn't the elected master. - String isolatedNode = nodes.get(0); - String nonIsolatedNode = nodes.get(1); - final Client nonIsolatedNodeClient = internalCluster().client(nonIsolatedNode); + final String isolatedNode = nodes.get(0); + final String nonIsolatedNode = nodes.get(1); // Simulate a network issue between the unlucky node and the rest of the cluster. - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - addFailToSendNoConnectRule(nodeId, isolatedNode); - addFailToSendNoConnectRule(isolatedNode, nodeId); - } - } + randomIsolateNode(isolatedNode, nodes); try { - // Wait until elected master has removed that the unlucky node... + logger.info("wait until elected master has removed [{}]", isolatedNode); boolean applied = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - return nonIsolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; + return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2; } }, 1, TimeUnit.MINUTES); assertThat(applied, is(true)); // The unlucky node must report *no* master node, since it can't connect to master and in fact it should // continuously ping until network failures have been resolved. However - final Client isolatedNodeClient = internalCluster().client(isolatedNode); // It may a take a bit before the node detects it has been cut off from the elected master + logger.info("waiting for isolated node [{}] to have no master", isolatedNode); applied = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState(); + ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState(); DiscoveryNodes localDiscoveryNodes = localClusterState.nodes(); logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint()); return localDiscoveryNodes.masterNode() == null; @@ -252,13 +235,14 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT }, 10, TimeUnit.SECONDS); assertThat(applied, is(true)); - ClusterHealthResponse healthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth("test") + ClusterHealthResponse healthResponse = client(nonIsolatedNode).admin().cluster().prepareHealth("test") .setWaitForYellowStatus().get(); assertThat(healthResponse.isTimedOut(), is(false)); assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // Reads on the right side of the split must work - searchResponse = nonIsolatedNodeClient.prepareSearch("test").setTypes("type") + logger.info("verifying healthy part of cluster returns data"); + searchResponse = client(nonIsolatedNode).prepareSearch("test").setTypes("type") .addSort("field", SortOrder.ASC) .get(); assertHitCount(searchResponse, indexRequests.length); @@ -269,20 +253,21 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } // Reads on the wrong side of the split are partial - searchResponse = isolatedNodeClient.prepareSearch("test").setTypes("type") - .addSort("field", SortOrder.ASC) + logger.info("verifying isolated node [{}] returns partial data", isolatedNode); + searchResponse = client(isolatedNode).prepareSearch("test").setTypes("type") + .addSort("field", SortOrder.ASC).setPreference("_only_local") .get(); assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards())); assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length)); - // Writes on the right side of the split must work - UpdateResponse updateResponse = nonIsolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2).get(); + logger.info("verifying writes on healthy cluster"); + UpdateResponse updateResponse = client(nonIsolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2).get(); assertThat(updateResponse.getVersion(), equalTo(2l)); - // Writes on the wrong side of the split fail try { - isolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2) - .setTimeout(TimeValue.timeValueSeconds(5)) // Fail quick, otherwise we wait 60 seconds. + logger.info("verifying writes on isolated [{}] fail", isolatedNode); + client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2) + .setTimeout("1s") // Fail quick, otherwise we wait 60 seconds. .get(); fail(); } catch (ClusterBlockException exception) { @@ -294,23 +279,13 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } finally { // stop simulating network failures, from this point on the unlucky node is able to rejoin // We also need to do this even if assertions fail, since otherwise the test framework can't work properly - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - clearNoConnectRule(nodeId, isolatedNode); - clearNoConnectRule(isolatedNode, nodeId); - } - } + restoreIsolation(isolatedNode, nodes); } // Wait until the master node sees all 3 nodes again. - clusterHealthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth() - .setWaitForGreenStatus() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + ensureStableCluster(3); + logger.info("verifying all nodes return all data"); for (Client client : clients()) { searchResponse = client.prepareSearch("test").setTypes("type") .addSort("field", SortOrder.ASC) @@ -334,16 +309,79 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } } + + @Test + @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") + public void voidIsolateMasterAndVerifyClusterStateConsensus() throws Exception { + final List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); + ensureStableCluster(3); + + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + )); + + ensureGreen(); + String isolatedNode = findMasterNode(nodes).name(); + String nonIsolatedNode = null; + for (String node : nodes) { + if (!node.equals(isolatedNode)) { + nonIsolatedNode = node; + break; + } + } + randomIsolateNode(isolatedNode, nodes); + + // make sure cluster reforms + ensureStableCluster(2, nonIsolatedNode); + + // restore isolation + restoreIsolation(isolatedNode, nodes); + + ensureStableCluster(3); + + logger.info("issue a reroute"); + // trigger a reroute now, instead of waiting for the background reroute of RerouteService + assertAcked(client().admin().cluster().prepareReroute()); + // and wait for it to finish. + assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).get().isTimedOut()); + + + // verify all cluster states are the same + ClusterState state = null; + for (String node : nodes) { + ClusterState nodeState = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); + if (state == null) { + state = nodeState; + continue; + } + // assert nodes are identical + try { + assertEquals("unequal versions", state.version(), nodeState.version()); + assertEquals("unequal node count", state.nodes().size(), nodeState.nodes().size()); + assertEquals("different masters ", state.nodes().masterNodeId(), nodeState.nodes().masterNodeId()); + assertEquals("different meta data version", state.metaData().version(), nodeState.metaData().version()); + if (!state.routingTable().prettyPrint().equals(nodeState.routingTable().prettyPrint())) { + fail("different routing"); + } + } catch (AssertionError t) { + fail("failed comparing cluster state: " + t.getMessage() + "\n" + + "--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state.prettyPrint() + + "\n--- cluster state [" + node + "]: ---\n" + nodeState.prettyPrint()); + } + + } + + } + + @Test - @Ignore @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") public void testRejoinDocumentExistsInAllShardCopies() throws Exception { - final List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .get(); - assertThat(clusterHealthResponse.isTimedOut(), is(false)); + List nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); + ensureStableCluster(3); + assertAcked(prepareCreate("test") .setSettings(ImmutableSettings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -352,23 +390,15 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT .get()); ensureGreen("test"); - String isolatedNode = findMasterNode(nodes).getName(); - String notIsolatedNode = null; - for (String node : nodes) { - if (!node.equals(isolatedNode)) { - notIsolatedNode = node; - break; - } - } + nodes = new ArrayList<>(nodes); + Collections.shuffle(nodes, getRandom()); + String isolatedNode = nodes.get(0); + String notIsolatedNode = nodes.get(1); + + randomIsolateNode(isolatedNode, nodes); + ensureStableCluster(2, notIsolatedNode); + assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut()); - logger.info("Isolating node[" + isolatedNode + "]"); - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - addFailToSendNoConnectRule(nodeId, isolatedNode); - addFailToSendNoConnectRule(isolatedNode, nodeId); - } - } - ensureYellow("test"); IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value").get(); assertThat(indexResponse.getVersion(), equalTo(1l)); @@ -381,13 +411,9 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT assertThat(getResponse.getVersion(), equalTo(1l)); assertThat(getResponse.getId(), equalTo(indexResponse.getId())); - for (String nodeId : nodes) { - if (!nodeId.equals(isolatedNode)) { - clearNoConnectRule(nodeId, isolatedNode); - clearNoConnectRule(isolatedNode, nodeId); - } - } + restoreIsolation(isolatedNode, nodes); + ensureStableCluster(3); ensureGreen("test"); for (String node : nodes) { @@ -401,6 +427,32 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT } } + protected void restoreIsolation(String isolatedNode, List nodes) { + logger.info("restoring isolation of [{}]", isolatedNode); + for (String nodeId : nodes) { + if (!nodeId.equals(isolatedNode)) { + clearNoConnectRule(nodeId, isolatedNode); + clearNoConnectRule(isolatedNode, nodeId); + } + } + } + + protected void randomIsolateNode(String isolatedNode, List nodes) { + boolean unresponsive = randomBoolean(); + logger.info("isolating [{}] with unresponsive: [{}]", isolatedNode, unresponsive); + for (String nodeId : nodes) { + if (!nodeId.equals(isolatedNode)) { + if (unresponsive) { + addUnresponsiveRule(nodeId, isolatedNode); + addUnresponsiveRule(isolatedNode, nodeId); + } else { + addFailToSendNoConnectRule(nodeId, isolatedNode); + addFailToSendNoConnectRule(isolatedNode, nodeId); + } + } + } + } + private DiscoveryNode findMasterNode(List nodes) { DiscoveryNode masterDiscoNode = null; for (String node : nodes) { @@ -421,9 +473,28 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT ((MockTransportService) mockTransportService).addFailToSendNoConnectRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); } + private void addUnresponsiveRule(String fromNode, String toNode) { + TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode); + ((MockTransportService) mockTransportService).addUnresponsiveRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); + } + private void clearNoConnectRule(String fromNode, String toNode) { TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode); ((MockTransportService) mockTransportService).clearRule(internalCluster().getInstance(Discovery.class, toNode).localNode()); } + + private void ensureStableCluster(int nodeCount) { + ensureStableCluster(nodeCount, null); + } + + private void ensureStableCluster(int nodeCount, @Nullable String viaNode) { + ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes(Integer.toString(nodeCount)) + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealthResponse.isTimedOut(), is(false)); + } + } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 5cfceafa220..20789924ac9 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -646,6 +646,13 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } public static Client client() { + return client(null); + } + + public static Client client(@Nullable String node) { + if (node != null) { + return internalCluster().client(node); + } Client client = cluster().client(); if (frequently()) { client = new RandomizingClient(client, getRandom());