[Discovery] do not use versions to optimize cluster state copying for a first update from a new master

We have an optimization which compares routing/meta data version of cluster states and tries to reuse the current object if the versions are equal. This can cause rare failures during recovery from a minimum_master_node breach when using the "new light rejoin" mechanism and simulated network disconnects. This happens where the current master updates it's state, doesn't manage to broadcast it to other nodes due to the disconnect and then steps down. The new master will start with a previous version and continue to update it. When the old master rejoins, the versions of it's state can equal but the content is different.

Also improved DiscoveryWithNetworkFailuresTests to simulate this failure (and other improvements)

Closes #6466
This commit is contained in:
Boaz Leskes 2014-06-11 15:54:47 +02:00
parent 1849d0966c
commit 58f8774fa2
6 changed files with 193 additions and 101 deletions

View File

@ -386,20 +386,6 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
}
} else {
if (previousClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) && !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
// force an update, its a fresh update from the master as we transition from a start of not having a master to having one
// have a fresh instances of routing and metadata to remove the chance that version might be the same
Builder builder = ClusterState.builder(newClusterState);
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()));
builder.metaData(MetaData.builder(newClusterState.metaData()));
newClusterState = builder.build();
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId());
} else if (newClusterState.version() < previousClusterState.version()) {
// we got a cluster state with older version, when we are *not* the master, let it in since it might be valid
// we check on version where applicable, like at ZenDiscovery#handleNewClusterStateFromMaster
logger.debug("got smaller cluster state when not master [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "]");
}
}
newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);

View File

@ -54,6 +54,7 @@ public class DiscoverySettings extends AbstractComponent {
super(settings);
nodeSettingsService.addListener(new ApplySettings());
this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK));
this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);
}
/**

View File

@ -58,6 +58,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private final TransportService transportService;
private final ClusterService clusterService;
private final DiscoveryService discoveryService;
private final DiscoveryNodeService discoveryNodeService;
private AllocationService allocationService;
private final ClusterName clusterName;
@ -77,7 +78,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings, DiscoveryService discoveryService) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
@ -85,6 +86,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
this.discoveryNodeService = discoveryNodeService;
this.version = version;
this.discoverySettings = discoverySettings;
this.discoveryService = discoveryService;
}
@Override
@ -305,6 +307,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
@ -312,6 +317,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
return currentState;
}
if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId());
return nodeSpecificClusterState;
}
ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState);
// if the routing table did not change, use the original one
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {

View File

@ -85,6 +85,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final ClusterService clusterService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoveryService discoveryService;
private final DiscoveryNodeService discoveryNodeService;
private final DiscoverySettings discoverySettings;
private final ZenPingService pingService;
@ -128,12 +129,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) {
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings,
DiscoveryService discoveryService) {
super(settings);
this.clusterName = clusterName;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryService = discoveryService;
this.discoveryNodeService = discoveryNodeService;
this.discoverySettings = discoverySettings;
this.pingService = pingService;
@ -641,6 +644,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
processNewClusterStates.add(processClusterState);
assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
@ -701,7 +708,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
}
if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId());
return updatedState;
}
// some optimizations to make sure we keep old objects where possible
ClusterState.Builder builder = ClusterState.builder(updatedState);
// if the routing table did not change, use the original one
if (updatedState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());

View File

@ -35,10 +35,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
@ -47,9 +47,10 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -61,13 +62,14 @@ import static org.hamcrest.Matchers.*;
/**
*/
@ClusterScope(scope= Scope.TEST, numDataNodes =0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationTest {
private static final Settings nodeSettings = ImmutableSettings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.minimum_master_nodes", 2)
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.build();
@ -97,12 +99,8 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
// Wait until a green status has been reaches and 3 nodes are part of the cluster
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.get();
assertThat(clusterHealthResponse.isTimedOut(), is(false));
// Wait until 3 nodes are part of the cluster
ensureStableCluster(3);
// Figure out what is the elected master node
DiscoveryNode masterDiscoNode = findMasterNode(nodes);
@ -155,11 +153,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
// Wait until the master node sees all 3 nodes again.
clusterHealthResponse = masterClient.admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.get();
assertThat(clusterHealthResponse.isTimedOut(), is(false));
ensureStableCluster(3);
for (String node : nodes) {
ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
@ -171,17 +165,12 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
@Test
@Ignore
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testDataConsistency() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
// Wait until a green status has been reaches and 3 nodes are part of the cluster
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.get();
assertThat(clusterHealthResponse.isTimedOut(), is(false));
// Wait until a 3 nodes are part of the cluster
ensureStableCluster(3);
assertAcked(prepareCreate("test")
.addMapping("type", "field", "type=long")
@ -216,35 +205,29 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
ensureGreen("test");
// Pick a node that isn't the elected master.
String isolatedNode = nodes.get(0);
String nonIsolatedNode = nodes.get(1);
final Client nonIsolatedNodeClient = internalCluster().client(nonIsolatedNode);
final String isolatedNode = nodes.get(0);
final String nonIsolatedNode = nodes.get(1);
// Simulate a network issue between the unlucky node and the rest of the cluster.
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
addFailToSendNoConnectRule(nodeId, isolatedNode);
addFailToSendNoConnectRule(isolatedNode, nodeId);
}
}
randomIsolateNode(isolatedNode, nodes);
try {
// Wait until elected master has removed that the unlucky node...
logger.info("wait until elected master has removed [{}]", isolatedNode);
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return nonIsolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
final Client isolatedNodeClient = internalCluster().client(isolatedNode);
// It may a take a bit before the node detects it has been cut off from the elected master
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState();
ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState();
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
@ -252,13 +235,14 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
ClusterHealthResponse healthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth("test")
ClusterHealthResponse healthResponse = client(nonIsolatedNode).admin().cluster().prepareHealth("test")
.setWaitForYellowStatus().get();
assertThat(healthResponse.isTimedOut(), is(false));
assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
// Reads on the right side of the split must work
searchResponse = nonIsolatedNodeClient.prepareSearch("test").setTypes("type")
logger.info("verifying healthy part of cluster returns data");
searchResponse = client(nonIsolatedNode).prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
assertHitCount(searchResponse, indexRequests.length);
@ -269,20 +253,21 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
// Reads on the wrong side of the split are partial
searchResponse = isolatedNodeClient.prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
logger.info("verifying isolated node [{}] returns partial data", isolatedNode);
searchResponse = client(isolatedNode).prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC).setPreference("_only_local")
.get();
assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards()));
assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length));
// Writes on the right side of the split must work
UpdateResponse updateResponse = nonIsolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2).get();
logger.info("verifying writes on healthy cluster");
UpdateResponse updateResponse = client(nonIsolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2).get();
assertThat(updateResponse.getVersion(), equalTo(2l));
// Writes on the wrong side of the split fail
try {
isolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2)
.setTimeout(TimeValue.timeValueSeconds(5)) // Fail quick, otherwise we wait 60 seconds.
logger.info("verifying writes on isolated [{}] fail", isolatedNode);
client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2)
.setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
.get();
fail();
} catch (ClusterBlockException exception) {
@ -294,23 +279,13 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
} finally {
// stop simulating network failures, from this point on the unlucky node is able to rejoin
// We also need to do this even if assertions fail, since otherwise the test framework can't work properly
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
clearNoConnectRule(nodeId, isolatedNode);
clearNoConnectRule(isolatedNode, nodeId);
}
}
restoreIsolation(isolatedNode, nodes);
}
// Wait until the master node sees all 3 nodes again.
clusterHealthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth()
.setWaitForGreenStatus()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.get();
assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealthResponse.isTimedOut(), is(false));
ensureStableCluster(3);
logger.info("verifying all nodes return all data");
for (Client client : clients()) {
searchResponse = client.prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
@ -334,16 +309,79 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
}
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void voidIsolateMasterAndVerifyClusterStateConsensus() throws Exception {
final List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
ensureStableCluster(3);
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
));
ensureGreen();
String isolatedNode = findMasterNode(nodes).name();
String nonIsolatedNode = null;
for (String node : nodes) {
if (!node.equals(isolatedNode)) {
nonIsolatedNode = node;
break;
}
}
randomIsolateNode(isolatedNode, nodes);
// make sure cluster reforms
ensureStableCluster(2, nonIsolatedNode);
// restore isolation
restoreIsolation(isolatedNode, nodes);
ensureStableCluster(3);
logger.info("issue a reroute");
// trigger a reroute now, instead of waiting for the background reroute of RerouteService
assertAcked(client().admin().cluster().prepareReroute());
// and wait for it to finish.
assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).get().isTimedOut());
// verify all cluster states are the same
ClusterState state = null;
for (String node : nodes) {
ClusterState nodeState = client(node).admin().cluster().prepareState().setLocal(true).get().getState();
if (state == null) {
state = nodeState;
continue;
}
// assert nodes are identical
try {
assertEquals("unequal versions", state.version(), nodeState.version());
assertEquals("unequal node count", state.nodes().size(), nodeState.nodes().size());
assertEquals("different masters ", state.nodes().masterNodeId(), nodeState.nodes().masterNodeId());
assertEquals("different meta data version", state.metaData().version(), nodeState.metaData().version());
if (!state.routingTable().prettyPrint().equals(nodeState.routingTable().prettyPrint())) {
fail("different routing");
}
} catch (AssertionError t) {
fail("failed comparing cluster state: " + t.getMessage() + "\n" +
"--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state.prettyPrint() +
"\n--- cluster state [" + node + "]: ---\n" + nodeState.prettyPrint());
}
}
}
@Test
@Ignore
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
final List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.get();
assertThat(clusterHealthResponse.isTimedOut(), is(false));
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
ensureStableCluster(3);
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@ -352,23 +390,15 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
.get());
ensureGreen("test");
String isolatedNode = findMasterNode(nodes).getName();
String notIsolatedNode = null;
for (String node : nodes) {
if (!node.equals(isolatedNode)) {
notIsolatedNode = node;
break;
}
}
nodes = new ArrayList<>(nodes);
Collections.shuffle(nodes, getRandom());
String isolatedNode = nodes.get(0);
String notIsolatedNode = nodes.get(1);
randomIsolateNode(isolatedNode, nodes);
ensureStableCluster(2, notIsolatedNode);
assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut());
logger.info("Isolating node[" + isolatedNode + "]");
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
addFailToSendNoConnectRule(nodeId, isolatedNode);
addFailToSendNoConnectRule(isolatedNode, nodeId);
}
}
ensureYellow("test");
IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value").get();
assertThat(indexResponse.getVersion(), equalTo(1l));
@ -381,13 +411,9 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
clearNoConnectRule(nodeId, isolatedNode);
clearNoConnectRule(isolatedNode, nodeId);
}
}
restoreIsolation(isolatedNode, nodes);
ensureStableCluster(3);
ensureGreen("test");
for (String node : nodes) {
@ -401,6 +427,32 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
}
protected void restoreIsolation(String isolatedNode, List<String> nodes) {
logger.info("restoring isolation of [{}]", isolatedNode);
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
clearNoConnectRule(nodeId, isolatedNode);
clearNoConnectRule(isolatedNode, nodeId);
}
}
}
protected void randomIsolateNode(String isolatedNode, List<String> nodes) {
boolean unresponsive = randomBoolean();
logger.info("isolating [{}] with unresponsive: [{}]", isolatedNode, unresponsive);
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
if (unresponsive) {
addUnresponsiveRule(nodeId, isolatedNode);
addUnresponsiveRule(isolatedNode, nodeId);
} else {
addFailToSendNoConnectRule(nodeId, isolatedNode);
addFailToSendNoConnectRule(isolatedNode, nodeId);
}
}
}
}
private DiscoveryNode findMasterNode(List<String> nodes) {
DiscoveryNode masterDiscoNode = null;
for (String node : nodes) {
@ -421,9 +473,28 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
((MockTransportService) mockTransportService).addFailToSendNoConnectRule(internalCluster().getInstance(Discovery.class, toNode).localNode());
}
private void addUnresponsiveRule(String fromNode, String toNode) {
TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode);
((MockTransportService) mockTransportService).addUnresponsiveRule(internalCluster().getInstance(Discovery.class, toNode).localNode());
}
private void clearNoConnectRule(String fromNode, String toNode) {
TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode);
((MockTransportService) mockTransportService).clearRule(internalCluster().getInstance(Discovery.class, toNode).localNode());
}
private void ensureStableCluster(int nodeCount) {
ensureStableCluster(nodeCount, null);
}
private void ensureStableCluster(int nodeCount, @Nullable String viaNode) {
ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCount))
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealthResponse.isTimedOut(), is(false));
}
}

View File

@ -646,6 +646,13 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
public static Client client() {
return client(null);
}
public static Client client(@Nullable String node) {
if (node != null) {
return internalCluster().client(node);
}
Client client = cluster().client();
if (frequently()) {
client = new RandomizingClient(client, getRandom());