Merge pull request #14653 from ywelsch/fix/mocktransport-match-all-bound-addresses

Disruption rules in MockTransportService should match all bound addresses of a node.
This commit is contained in:
Yannick Welsch 2015-11-12 17:22:14 +01:00
commit b6c21cc55a
17 changed files with 146 additions and 85 deletions

View File

@ -47,7 +47,6 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase {
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2) .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)
.put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default
.build(); .build();
internalCluster().startMasterOnlyNodesAsync(3, sharedSettings).get(); internalCluster().startMasterOnlyNodesAsync(3, sharedSettings).get();

View File

@ -207,7 +207,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
final Set<String> blockedActions = newHashSet(NodesStatsAction.NAME, NodesStatsAction.NAME + "[n]", IndicesStatsAction.NAME, IndicesStatsAction.NAME + "[n]"); final Set<String> blockedActions = newHashSet(NodesStatsAction.NAME, NodesStatsAction.NAME + "[n]", IndicesStatsAction.NAME, IndicesStatsAction.NAME + "[n]");
// drop all outgoing stats requests to force a timeout. // drop all outgoing stats requests to force a timeout.
for (DiscoveryNode node : internalTestCluster.clusterService().state().getNodes()) { for (DiscoveryNode node : internalTestCluster.clusterService().state().getNodes()) {
mockTransportService.addDelegate(node, new MockTransportService.DelegateTransport(mockTransportService.original()) { mockTransportService.addDelegate(internalTestCluster.getInstance(TransportService.class, node.getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override @Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException { TransportRequestOptions options) throws IOException, TransportException {

View File

@ -163,9 +163,6 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("http.enabled", false) // just to make test quicker .put("http.enabled", false) // just to make test quicker
.put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default
.put("transport.bind_host", "127.0.0.1")
.put("transport.publish_host", "127.0.0.1")
.put("gateway.local.list_timeout", "10s") // still long to induce failures but to long so test won't time out .put("gateway.local.list_timeout", "10s") // still long to induce failures but to long so test won't time out
.build(); .build();
@ -844,23 +841,26 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes(); DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
TransportService masterTranspotService = internalCluster().getInstance(TransportService.class, discoveryNodes.masterNode().getName());
logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode); logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode);
MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode); MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode);
nonMasterTransportService.addFailToSendNoConnectRule(discoveryNodes.masterNode()); nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService);
assertNoMaster(nonMasterNode); assertNoMaster(nonMasterNode);
logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode); logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
TransportService localTransportService = internalCluster().getInstance(TransportService.class, discoveryNodes.localNode().getName());
if (randomBoolean()) { if (randomBoolean()) {
masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.SEND_ACTION_NAME); masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME);
} else { } else {
masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.COMMIT_ACTION_NAME); masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.COMMIT_ACTION_NAME);
} }
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode); logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
final CountDownLatch countDownLatch = new CountDownLatch(2); final CountDownLatch countDownLatch = new CountDownLatch(2);
nonMasterTransportService.addDelegate(discoveryNodes.masterNode(), new MockTransportService.DelegateTransport(nonMasterTransportService.original()) { nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService.original()) {
@Override @Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) { if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
@ -873,8 +873,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
countDownLatch.await(); countDownLatch.await();
logger.info("waiting for cluster to reform"); logger.info("waiting for cluster to reform");
masterTransportService.clearRule(discoveryNodes.localNode()); masterTransportService.clearRule(localTransportService);
nonMasterTransportService.clearRule(discoveryNodes.masterNode()); nonMasterTransportService.clearRule(localTransportService);
ensureStableCluster(2); ensureStableCluster(2);
@ -924,9 +924,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode); logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
if (randomBoolean()) { if (randomBoolean()) {
masterTransportService.addUnresponsiveRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode()); masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
} else { } else {
masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode()); masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
} }
logger.info("waiting for [{}] to be removed from cluster", nonMasterNode); logger.info("waiting for [{}] to be removed from cluster", nonMasterNode);

View File

@ -476,7 +476,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
final AtomicBoolean keepFailing = new AtomicBoolean(true); final AtomicBoolean keepFailing = new AtomicBoolean(true);
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, node1)); MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, node1));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, node3).localNode(), mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, node3),
new MockTransportService.DelegateTransport(mockTransportService.original()) { new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override @Override

View File

@ -115,12 +115,12 @@ public class TransportIndexFailuresIT extends ESIntegTestCase {
logger.info("--> preventing index/replica operations"); logger.info("--> preventing index/replica operations");
TransportService mockTransportService = internalCluster().getInstance(TransportService.class, primaryNode); TransportService mockTransportService = internalCluster().getInstance(TransportService.class, primaryNode);
((MockTransportService) mockTransportService).addFailToSendNoConnectRule( ((MockTransportService) mockTransportService).addFailToSendNoConnectRule(
internalCluster().getInstance(Discovery.class, replicaNode).localNode(), internalCluster().getInstance(TransportService.class, replicaNode),
singleton(IndexAction.NAME + "[r]") singleton(IndexAction.NAME + "[r]")
); );
mockTransportService = internalCluster().getInstance(TransportService.class, replicaNode); mockTransportService = internalCluster().getInstance(TransportService.class, replicaNode);
((MockTransportService) mockTransportService).addFailToSendNoConnectRule( ((MockTransportService) mockTransportService).addFailToSendNoConnectRule(
internalCluster().getInstance(Discovery.class, primaryNode).localNode(), internalCluster().getInstance(TransportService.class, primaryNode),
singleton(IndexAction.NAME + "[r]") singleton(IndexAction.NAME + "[r]")
); );

View File

@ -335,7 +335,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
final CountDownLatch hasCorrupted = new CountDownLatch(1); final CountDownLatch hasCorrupted = new CountDownLatch(1);
for (NodeStats dataNode : dataNodeStats) { for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override @Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
@ -407,7 +407,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
final boolean truncate = randomBoolean(); final boolean truncate = randomBoolean();
for (NodeStats dataNode : dataNodeStats) { for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override @Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {

View File

@ -83,7 +83,7 @@ public class ExceptionRetryIT extends ESIntegTestCase {
//create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry. //create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry.
for (NodeStats dataNode : nodeStats.getNodes()) { for (NodeStats dataNode : nodeStats.getNodes()) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override @Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {

View File

@ -580,12 +580,12 @@ public class IndexRecoveryIT extends ESIntegTestCase {
MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName); MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName);
MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName); MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
DiscoveryNode redDiscoNode = internalCluster().getInstance(ClusterService.class, redNodeName).localNode(); TransportService redTransportService = internalCluster().getInstance(TransportService.class, redNodeName);
DiscoveryNode blueDiscoNode = internalCluster().getInstance(ClusterService.class, blueNodeName).localNode(); TransportService blueTransportService = internalCluster().getInstance(TransportService.class, blueNodeName);
final CountDownLatch requestBlocked = new CountDownLatch(1); final CountDownLatch requestBlocked = new CountDownLatch(1);
blueMockTransportService.addDelegate(redDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, blueMockTransportService.original(), requestBlocked)); blueMockTransportService.addDelegate(redTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, blueMockTransportService.original(), requestBlocked));
redMockTransportService.addDelegate(blueDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, redMockTransportService.original(), requestBlocked)); redMockTransportService.addDelegate(blueTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, redMockTransportService.original(), requestBlocked));
logger.info("--> starting recovery from blue to red"); logger.info("--> starting recovery from blue to red");
client().admin().indices().prepareUpdateSettings(indexName).setSettings( client().admin().indices().prepareUpdateSettings(indexName).setSettings(

View File

@ -195,10 +195,9 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
// add a transport delegate that will prevent the shard active request to succeed the first time after relocation has finished. // add a transport delegate that will prevent the shard active request to succeed the first time after relocation has finished.
// node_1 will then wait for the next cluster state change before it tries a next attempt to delet the shard. // node_1 will then wait for the next cluster state change before it tries a next attempt to delet the shard.
MockTransportService transportServiceNode_1 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_1); MockTransportService transportServiceNode_1 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_1);
String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().id(); TransportService transportServiceNode_2 = internalCluster().getInstance(TransportService.class, node_2);
DiscoveryNode node_2_disco = internalCluster().clusterService().state().getNodes().dataNodes().get(node_2_id);
final CountDownLatch shardActiveRequestSent = new CountDownLatch(1); final CountDownLatch shardActiveRequestSent = new CountDownLatch(1);
transportServiceNode_1.addDelegate(node_2_disco, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) { transportServiceNode_1.addDelegate(transportServiceNode_2, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) {
@Override @Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (action.equals("internal:index/shard/exists") && shardActiveRequestSent.getCount() > 0) { if (action.equals("internal:index/shard/exists") && shardActiveRequestSent.getCount() > 0) {

View File

@ -377,7 +377,7 @@ public class RelocationIT extends ESIntegTestCase {
MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, p_node); MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, p_node);
for (DiscoveryNode node : clusterService.state().nodes()) { for (DiscoveryNode node : clusterService.state().nodes()) {
if (!node.equals(clusterService.localNode())) { if (!node.equals(clusterService.localNode())) {
mockTransportService.addDelegate(node, new RecoveryCorruption(mockTransportService.original(), corruptionCount)); mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, node.getName()), new RecoveryCorruption(mockTransportService.original(), corruptionCount));
} }
} }

View File

@ -121,7 +121,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
final AtomicBoolean truncate = new AtomicBoolean(true); final AtomicBoolean truncate = new AtomicBoolean(true);
for (NodeStats dataNode : dataNodeStats) { for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override @Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {

View File

@ -1046,7 +1046,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} }
}); });
serviceB.addFailToSendNoConnectRule(nodeA); serviceB.addFailToSendNoConnectRule(serviceA);
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello", TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() { new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() {
@ -1104,7 +1104,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} }
}); });
serviceB.addUnresponsiveRule(nodeA); serviceB.addUnresponsiveRule(serviceA);
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello", TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() { new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {

View File

@ -18,7 +18,6 @@
*/ */
package org.elasticsearch.test.disruption; package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
@ -78,10 +77,9 @@ public class NetworkDelaysPartition extends NetworkPartition {
} }
@Override @Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
DiscoveryNode node2, MockTransportService transportService2) { transportService1.addUnresponsiveRule(transportService1, duration);
transportService1.addUnresponsiveRule(node1, duration); transportService1.addUnresponsiveRule(transportService2, duration);
transportService1.addUnresponsiveRule(node2, duration);
} }
@Override @Override

View File

@ -18,7 +18,6 @@
*/ */
package org.elasticsearch.test.disruption; package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
@ -46,10 +45,9 @@ public class NetworkDisconnectPartition extends NetworkPartition {
} }
@Override @Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
DiscoveryNode node2, MockTransportService transportService2) { transportService1.addFailToSendNoConnectRule(transportService2);
transportService1.addFailToSendNoConnectRule(node2); transportService2.addFailToSendNoConnectRule(transportService1);
transportService2.addFailToSendNoConnectRule(node1);
} }
@Override @Override

View File

@ -18,10 +18,8 @@
*/ */
package org.elasticsearch.test.disruption; package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -29,7 +27,6 @@ import org.elasticsearch.transport.TransportService;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
@ -140,7 +137,6 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
@Override @Override
public synchronized void removeFromNode(String node, InternalTestCluster cluster) { public synchronized void removeFromNode(String node, InternalTestCluster cluster) {
MockTransportService transportService = (MockTransportService) cluster.getInstance(TransportService.class, node); MockTransportService transportService = (MockTransportService) cluster.getInstance(TransportService.class, node);
DiscoveryNode discoveryNode = discoveryNode(node);
Set<String> otherSideNodes; Set<String> otherSideNodes;
if (nodesSideOne.contains(node)) { if (nodesSideOne.contains(node)) {
otherSideNodes = nodesSideTwo; otherSideNodes = nodesSideTwo;
@ -153,8 +149,7 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
} }
for (String node2 : otherSideNodes) { for (String node2 : otherSideNodes) {
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
DiscoveryNode discoveryNode2 = discoveryNode(node2); removeDisruption(transportService, transportService2);
removeDisruption(discoveryNode, transportService, discoveryNode2, transportService2);
} }
} }
@ -165,11 +160,6 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
protected abstract String getPartitionDescription(); protected abstract String getPartitionDescription();
protected DiscoveryNode discoveryNode(String node) {
return cluster.getInstance(Discovery.class, node).localNode();
}
@Override @Override
public synchronized void startDisrupting() { public synchronized void startDisrupting() {
if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) { if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) {
@ -179,11 +169,9 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
activeDisruption = true; activeDisruption = true;
for (String node1 : nodesSideOne) { for (String node1 : nodesSideOne) {
MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1); MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
DiscoveryNode discoveryNode1 = discoveryNode(node1);
for (String node2 : nodesSideTwo) { for (String node2 : nodesSideTwo) {
DiscoveryNode discoveryNode2 = discoveryNode(node2);
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
applyDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2); applyDisruption(transportService1, transportService2);
} }
} }
} }
@ -197,24 +185,20 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
logger.info("restoring partition between nodes {} & nodes {}", nodesSideOne, nodesSideTwo); logger.info("restoring partition between nodes {} & nodes {}", nodesSideOne, nodesSideTwo);
for (String node1 : nodesSideOne) { for (String node1 : nodesSideOne) {
MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1); MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
DiscoveryNode discoveryNode1 = discoveryNode(node1);
for (String node2 : nodesSideTwo) { for (String node2 : nodesSideTwo) {
DiscoveryNode discoveryNode2 = discoveryNode(node2);
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
removeDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2); removeDisruption(transportService1, transportService2);
} }
} }
activeDisruption = false; activeDisruption = false;
} }
abstract void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, abstract void applyDisruption(MockTransportService transportService1, MockTransportService transportService2);
DiscoveryNode node2, MockTransportService transportService2);
protected void removeDisruption(DiscoveryNode node1, MockTransportService transportService1, protected void removeDisruption(MockTransportService transportService1, MockTransportService transportService2) {
DiscoveryNode node2, MockTransportService transportService2) { transportService1.clearRule(transportService2);
transportService1.clearRule(node2); transportService2.clearRule(transportService1);
transportService2.clearRule(node1);
} }
} }

View File

@ -18,7 +18,6 @@
*/ */
package org.elasticsearch.test.disruption; package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
@ -45,10 +44,9 @@ public class NetworkUnresponsivePartition extends NetworkPartition {
} }
@Override @Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
DiscoveryNode node2, MockTransportService transportService2) { transportService1.addUnresponsiveRule(transportService2);
transportService1.addUnresponsiveRule(node2); transportService2.addUnresponsiveRule(transportService1);
transportService2.addUnresponsiveRule(node1);
} }
@Override @Override

View File

@ -55,6 +55,14 @@ import java.util.concurrent.CopyOnWriteArrayList;
/** /**
* A mock transport service that allows to simulate different network topology failures. * A mock transport service that allows to simulate different network topology failures.
* Internally it maps TransportAddress objects to rules that inject failures.
* Adding rules for a node is done by adding rules for all bound addresses of a node
* (and the publish address, if different).
* Matching requests to rules is based on the transport address associated with the
* discovery node of the request, namely by DiscoveryNode.getAddress().
* This address is usually the publish address of the node but can also be a different one
* (for example, @see org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing, which constructs
* fake DiscoveryNode instances where the publish address is one of the bound addresses).
*/ */
public class MockTransportService extends TransportService { public class MockTransportService extends TransportService {
@ -82,7 +90,14 @@ public class MockTransportService extends TransportService {
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) { public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, new LookupTestTransport(transport), threadPool); super(settings, new LookupTestTransport(transport), threadPool);
this.original = transport; this.original = transport;
}
public static TransportAddress[] extractTransportAddresses(TransportService transportService) {
HashSet<TransportAddress> transportAddresses = new HashSet<>();
BoundTransportAddress boundTransportAddress = transportService.boundAddress();
transportAddresses.addAll(Arrays.asList(boundTransportAddress.boundAddresses()));
transportAddresses.add(boundTransportAddress.publishAddress());
return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
} }
/** /**
@ -93,10 +108,19 @@ public class MockTransportService extends TransportService {
} }
/** /**
* Clears the rule associated with the provided node. * Clears the rule associated with the provided transport service.
*/ */
public void clearRule(DiscoveryNode node) { public void clearRule(TransportService transportService) {
transport().transports.remove(node.getAddress()); for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
clearRule(transportAddress);
}
}
/**
* Clears the rule associated with the provided transport address.
*/
public void clearRule(TransportAddress transportAddress) {
transport().transports.remove(transportAddress);
} }
/** /**
@ -110,8 +134,18 @@ public class MockTransportService extends TransportService {
* Adds a rule that will cause every send request to fail, and each new connect since the rule * Adds a rule that will cause every send request to fail, and each new connect since the rule
* is added to fail as well. * is added to fail as well.
*/ */
public void addFailToSendNoConnectRule(DiscoveryNode node) { public void addFailToSendNoConnectRule(TransportService transportService) {
addDelegate(node, new DelegateTransport(original) { for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addFailToSendNoConnectRule(transportAddress);
}
}
/**
* Adds a rule that will cause every send request to fail, and each new connect since the rule
* is added to fail as well.
*/
public void addFailToSendNoConnectRule(TransportAddress transportAddress) {
addDelegate(transportAddress, new DelegateTransport(original) {
@Override @Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException { public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
throw new ConnectTransportException(node, "DISCONNECT: simulated"); throw new ConnectTransportException(node, "DISCONNECT: simulated");
@ -132,16 +166,32 @@ public class MockTransportService extends TransportService {
/** /**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions * Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/ */
public void addFailToSendNoConnectRule(DiscoveryNode node, final String... blockedActions) { public void addFailToSendNoConnectRule(TransportService transportService, final String... blockedActions) {
addFailToSendNoConnectRule(node, new HashSet<>(Arrays.asList(blockedActions))); addFailToSendNoConnectRule(transportService, new HashSet<>(Arrays.asList(blockedActions)));
} }
/** /**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions * Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/ */
public void addFailToSendNoConnectRule(DiscoveryNode node, final Set<String> blockedActions) { public void addFailToSendNoConnectRule(TransportAddress transportAddress, final String... blockedActions) {
addFailToSendNoConnectRule(transportAddress, new HashSet<>(Arrays.asList(blockedActions)));
}
addDelegate(node, new DelegateTransport(original) { /**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/
public void addFailToSendNoConnectRule(TransportService transportService, final Set<String> blockedActions) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addFailToSendNoConnectRule(transportAddress, blockedActions);
}
}
/**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/
public void addFailToSendNoConnectRule(TransportAddress transportAddress, final Set<String> blockedActions) {
addDelegate(transportAddress, new DelegateTransport(original) {
@Override @Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException { public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
original.connectToNode(node); original.connectToNode(node);
@ -167,8 +217,18 @@ public class MockTransportService extends TransportService {
* Adds a rule that will cause ignores each send request, simulating an unresponsive node * Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added. * and failing to connect once the rule was added.
*/ */
public void addUnresponsiveRule(DiscoveryNode node) { public void addUnresponsiveRule(TransportService transportService) {
addDelegate(node, new DelegateTransport(original) { for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addUnresponsiveRule(transportAddress);
}
}
/**
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added.
*/
public void addUnresponsiveRule(TransportAddress transportAddress) {
addDelegate(transportAddress, new DelegateTransport(original) {
@Override @Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException { public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated"); throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
@ -192,10 +252,22 @@ public class MockTransportService extends TransportService {
* *
* @param duration the amount of time to delay sending and connecting. * @param duration the amount of time to delay sending and connecting.
*/ */
public void addUnresponsiveRule(DiscoveryNode node, final TimeValue duration) { public void addUnresponsiveRule(TransportService transportService, final TimeValue duration) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addUnresponsiveRule(transportAddress, duration);
}
}
/**
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added.
*
* @param duration the amount of time to delay sending and connecting.
*/
public void addUnresponsiveRule(TransportAddress transportAddress, final TimeValue duration) {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
addDelegate(node, new DelegateTransport(original) { addDelegate(transportAddress, new DelegateTransport(original) {
TimeValue getDelay() { TimeValue getDelay() {
return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime)); return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime));
@ -280,12 +352,25 @@ public class MockTransportService extends TransportService {
} }
/** /**
* Adds a new delegate transport that is used for communication with the given node. * Adds a new delegate transport that is used for communication with the given transport service.
* *
* @return <tt>true</tt> iff no other delegate was registered for this node before, otherwise <tt>false</tt> * @return <tt>true</tt> iff no other delegate was registered for any of the addresses bound by transport service, otherwise <tt>false</tt>
*/ */
public boolean addDelegate(DiscoveryNode node, DelegateTransport transport) { public boolean addDelegate(TransportService transportService, DelegateTransport transport) {
return transport().transports.put(node.getAddress(), transport) == null; boolean noRegistered = true;
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
noRegistered &= addDelegate(transportAddress, transport);
}
return noRegistered;
}
/**
* Adds a new delegate transport that is used for communication with the given transport address.
*
* @return <tt>true</tt> iff no other delegate was registered for this address before, otherwise <tt>false</tt>
*/
public boolean addDelegate(TransportAddress transportAddress, DelegateTransport transport) {
return transport().transports.put(transportAddress, transport) == null;
} }
private LookupTestTransport transport() { private LookupTestTransport transport() {