diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index fdb482774c6..825e3e40894 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -47,7 +47,6 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase { .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2) - .put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default .build(); internalCluster().startMasterOnlyNodesAsync(3, sharedSettings).get(); diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 1cce41e6a2c..5ed45620a03 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -207,7 +207,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { final Set blockedActions = newHashSet(NodesStatsAction.NAME, NodesStatsAction.NAME + "[n]", IndicesStatsAction.NAME, IndicesStatsAction.NAME + "[n]"); // drop all outgoing stats requests to force a timeout. for (DiscoveryNode node : internalTestCluster.clusterService().state().getNodes()) { - mockTransportService.addDelegate(node, new MockTransportService.DelegateTransport(mockTransportService.original()) { + mockTransportService.addDelegate(internalTestCluster.getInstance(TransportService.class, node.getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index d44e27ce714..f9dbda217d0 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -163,9 +163,6 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put("http.enabled", false) // just to make test quicker - .put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default - .put("transport.bind_host", "127.0.0.1") - .put("transport.publish_host", "127.0.0.1") .put("gateway.local.list_timeout", "10s") // still long to induce failures but to long so test won't time out .build(); @@ -844,23 +841,26 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes(); + TransportService masterTranspotService = internalCluster().getInstance(TransportService.class, discoveryNodes.masterNode().getName()); + logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode); MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode); - nonMasterTransportService.addFailToSendNoConnectRule(discoveryNodes.masterNode()); + nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService); assertNoMaster(nonMasterNode); logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode); MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); + TransportService localTransportService = internalCluster().getInstance(TransportService.class, discoveryNodes.localNode().getName()); if (randomBoolean()) { - masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.SEND_ACTION_NAME); + masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME); } else { - masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.COMMIT_ACTION_NAME); + masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.COMMIT_ACTION_NAME); } logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode); final CountDownLatch countDownLatch = new CountDownLatch(2); - nonMasterTransportService.addDelegate(discoveryNodes.masterNode(), new MockTransportService.DelegateTransport(nonMasterTransportService.original()) { + nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService.original()) { @Override public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) { @@ -873,8 +873,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { countDownLatch.await(); logger.info("waiting for cluster to reform"); - masterTransportService.clearRule(discoveryNodes.localNode()); - nonMasterTransportService.clearRule(discoveryNodes.masterNode()); + masterTransportService.clearRule(localTransportService); + nonMasterTransportService.clearRule(localTransportService); ensureStableCluster(2); @@ -924,9 +924,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode); MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); if (randomBoolean()) { - masterTransportService.addUnresponsiveRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode()); + masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode)); } else { - masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode()); + masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(TransportService.class, nonMasterNode)); } logger.info("waiting for [{}] to be removed from cluster", nonMasterNode); diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java index ed1ce65eefc..e214cea9cc1 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java +++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java @@ -476,7 +476,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { final AtomicBoolean keepFailing = new AtomicBoolean(true); MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, node1)); - mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, node3).localNode(), + mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, node3), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override diff --git a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java b/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java index 5348211a849..36af42ae29d 100644 --- a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java +++ b/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java @@ -115,12 +115,12 @@ public class TransportIndexFailuresIT extends ESIntegTestCase { logger.info("--> preventing index/replica operations"); TransportService mockTransportService = internalCluster().getInstance(TransportService.class, primaryNode); ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( - internalCluster().getInstance(Discovery.class, replicaNode).localNode(), + internalCluster().getInstance(TransportService.class, replicaNode), singleton(IndexAction.NAME + "[r]") ); mockTransportService = internalCluster().getInstance(TransportService.class, replicaNode); ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( - internalCluster().getInstance(Discovery.class, primaryNode).localNode(), + internalCluster().getInstance(TransportService.class, primaryNode), singleton(IndexAction.NAME + "[r]") ); diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index 044a18835ae..8923bad2fcd 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -335,7 +335,7 @@ public class CorruptedFileIT extends ESIntegTestCase { final CountDownLatch hasCorrupted = new CountDownLatch(1); for (NodeStats dataNode : dataNodeStats) { MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); - mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { + mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { @@ -407,7 +407,7 @@ public class CorruptedFileIT extends ESIntegTestCase { final boolean truncate = randomBoolean(); for (NodeStats dataNode : dataNodeStats) { MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); - mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { + mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { diff --git a/core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java b/core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java index cc3c7868069..f5b7fc250aa 100644 --- a/core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java @@ -83,7 +83,7 @@ public class ExceptionRetryIT extends ESIntegTestCase { //create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry. for (NodeStats dataNode : nodeStats.getNodes()) { MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); - mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { + mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 883d04e8185..5ece413f796 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -580,12 +580,12 @@ public class IndexRecoveryIT extends ESIntegTestCase { MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName); MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName); - DiscoveryNode redDiscoNode = internalCluster().getInstance(ClusterService.class, redNodeName).localNode(); - DiscoveryNode blueDiscoNode = internalCluster().getInstance(ClusterService.class, blueNodeName).localNode(); + TransportService redTransportService = internalCluster().getInstance(TransportService.class, redNodeName); + TransportService blueTransportService = internalCluster().getInstance(TransportService.class, blueNodeName); final CountDownLatch requestBlocked = new CountDownLatch(1); - blueMockTransportService.addDelegate(redDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, blueMockTransportService.original(), requestBlocked)); - redMockTransportService.addDelegate(blueDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, redMockTransportService.original(), requestBlocked)); + blueMockTransportService.addDelegate(redTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, blueMockTransportService.original(), requestBlocked)); + redMockTransportService.addDelegate(blueTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, redMockTransportService.original(), requestBlocked)); logger.info("--> starting recovery from blue to red"); client().admin().indices().prepareUpdateSettings(indexName).setSettings( diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index b56e3ad3647..2ca97bfd8d8 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -195,10 +195,9 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { // add a transport delegate that will prevent the shard active request to succeed the first time after relocation has finished. // node_1 will then wait for the next cluster state change before it tries a next attempt to delet the shard. MockTransportService transportServiceNode_1 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_1); - String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().id(); - DiscoveryNode node_2_disco = internalCluster().clusterService().state().getNodes().dataNodes().get(node_2_id); + TransportService transportServiceNode_2 = internalCluster().getInstance(TransportService.class, node_2); final CountDownLatch shardActiveRequestSent = new CountDownLatch(1); - transportServiceNode_1.addDelegate(node_2_disco, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) { + transportServiceNode_1.addDelegate(transportServiceNode_2, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) { @Override public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { if (action.equals("internal:index/shard/exists") && shardActiveRequestSent.getCount() > 0) { diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 34abea7816e..57b5e888ea9 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -377,7 +377,7 @@ public class RelocationIT extends ESIntegTestCase { MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, p_node); for (DiscoveryNode node : clusterService.state().nodes()) { if (!node.equals(clusterService.localNode())) { - mockTransportService.addDelegate(node, new RecoveryCorruption(mockTransportService.original(), corruptionCount)); + mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, node.getName()), new RecoveryCorruption(mockTransportService.original(), corruptionCount)); } } diff --git a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java index b3f13001d68..d94f72ea80f 100644 --- a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java @@ -121,7 +121,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase { final AtomicBoolean truncate = new AtomicBoolean(true); for (NodeStats dataNode : dataNodeStats) { MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); - mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { + mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) { @Override public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 643b4fe88ab..fc72edfcb3b 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1046,7 +1046,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } }); - serviceB.addFailToSendNoConnectRule(nodeA); + serviceB.addFailToSendNoConnectRule(serviceA); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", new StringMessageRequest("moshe"), new BaseTransportResponseHandler() { @@ -1104,7 +1104,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } }); - serviceB.addUnresponsiveRule(nodeA); + serviceB.addUnresponsiveRule(serviceA); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler() { diff --git a/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java b/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java index 8439f6e8f76..c422b042721 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java +++ b/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.test.disruption; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.transport.MockTransportService; @@ -78,10 +77,9 @@ public class NetworkDelaysPartition extends NetworkPartition { } @Override - void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, - DiscoveryNode node2, MockTransportService transportService2) { - transportService1.addUnresponsiveRule(node1, duration); - transportService1.addUnresponsiveRule(node2, duration); + void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) { + transportService1.addUnresponsiveRule(transportService1, duration); + transportService1.addUnresponsiveRule(transportService2, duration); } @Override diff --git a/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java b/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java index 8653b50f749..ed0aa17cfcf 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java +++ b/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.test.disruption; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.transport.MockTransportService; @@ -46,10 +45,9 @@ public class NetworkDisconnectPartition extends NetworkPartition { } @Override - void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, - DiscoveryNode node2, MockTransportService transportService2) { - transportService1.addFailToSendNoConnectRule(node2); - transportService2.addFailToSendNoConnectRule(node1); + void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) { + transportService1.addFailToSendNoConnectRule(transportService2); + transportService2.addFailToSendNoConnectRule(transportService1); } @Override diff --git a/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartition.java b/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartition.java index 88bcb9024a1..9a65fc579f0 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartition.java +++ b/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartition.java @@ -18,10 +18,8 @@ */ package org.elasticsearch.test.disruption; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.discovery.Discovery; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; @@ -29,7 +27,6 @@ import org.elasticsearch.transport.TransportService; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Random; import java.util.Set; @@ -140,7 +137,6 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme { @Override public synchronized void removeFromNode(String node, InternalTestCluster cluster) { MockTransportService transportService = (MockTransportService) cluster.getInstance(TransportService.class, node); - DiscoveryNode discoveryNode = discoveryNode(node); Set otherSideNodes; if (nodesSideOne.contains(node)) { otherSideNodes = nodesSideTwo; @@ -153,8 +149,7 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme { } for (String node2 : otherSideNodes) { MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); - DiscoveryNode discoveryNode2 = discoveryNode(node2); - removeDisruption(discoveryNode, transportService, discoveryNode2, transportService2); + removeDisruption(transportService, transportService2); } } @@ -165,11 +160,6 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme { protected abstract String getPartitionDescription(); - - protected DiscoveryNode discoveryNode(String node) { - return cluster.getInstance(Discovery.class, node).localNode(); - } - @Override public synchronized void startDisrupting() { if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) { @@ -179,11 +169,9 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme { activeDisruption = true; for (String node1 : nodesSideOne) { MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1); - DiscoveryNode discoveryNode1 = discoveryNode(node1); for (String node2 : nodesSideTwo) { - DiscoveryNode discoveryNode2 = discoveryNode(node2); MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); - applyDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2); + applyDisruption(transportService1, transportService2); } } } @@ -197,24 +185,20 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme { logger.info("restoring partition between nodes {} & nodes {}", nodesSideOne, nodesSideTwo); for (String node1 : nodesSideOne) { MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1); - DiscoveryNode discoveryNode1 = discoveryNode(node1); for (String node2 : nodesSideTwo) { - DiscoveryNode discoveryNode2 = discoveryNode(node2); MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2); - removeDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2); + removeDisruption(transportService1, transportService2); } } activeDisruption = false; } - abstract void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, - DiscoveryNode node2, MockTransportService transportService2); + abstract void applyDisruption(MockTransportService transportService1, MockTransportService transportService2); - protected void removeDisruption(DiscoveryNode node1, MockTransportService transportService1, - DiscoveryNode node2, MockTransportService transportService2) { - transportService1.clearRule(node2); - transportService2.clearRule(node1); + protected void removeDisruption(MockTransportService transportService1, MockTransportService transportService2) { + transportService1.clearRule(transportService2); + transportService2.clearRule(transportService1); } } diff --git a/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java b/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java index 1feb56c46c7..b69b7af3e5e 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java +++ b/test-framework/src/main/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.test.disruption; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.transport.MockTransportService; @@ -45,10 +44,9 @@ public class NetworkUnresponsivePartition extends NetworkPartition { } @Override - void applyDisruption(DiscoveryNode node1, MockTransportService transportService1, - DiscoveryNode node2, MockTransportService transportService2) { - transportService1.addUnresponsiveRule(node2); - transportService2.addUnresponsiveRule(node1); + void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) { + transportService1.addUnresponsiveRule(transportService2); + transportService2.addUnresponsiveRule(transportService1); } @Override diff --git a/test-framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test-framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index f5da216da5d..e1efd6c3745 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test-framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -55,6 +55,14 @@ import java.util.concurrent.CopyOnWriteArrayList; /** * A mock transport service that allows to simulate different network topology failures. + * Internally it maps TransportAddress objects to rules that inject failures. + * Adding rules for a node is done by adding rules for all bound addresses of a node + * (and the publish address, if different). + * Matching requests to rules is based on the transport address associated with the + * discovery node of the request, namely by DiscoveryNode.getAddress(). + * This address is usually the publish address of the node but can also be a different one + * (for example, @see org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing, which constructs + * fake DiscoveryNode instances where the publish address is one of the bound addresses). */ public class MockTransportService extends TransportService { @@ -82,7 +90,14 @@ public class MockTransportService extends TransportService { public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) { super(settings, new LookupTestTransport(transport), threadPool); this.original = transport; + } + public static TransportAddress[] extractTransportAddresses(TransportService transportService) { + HashSet transportAddresses = new HashSet<>(); + BoundTransportAddress boundTransportAddress = transportService.boundAddress(); + transportAddresses.addAll(Arrays.asList(boundTransportAddress.boundAddresses())); + transportAddresses.add(boundTransportAddress.publishAddress()); + return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]); } /** @@ -93,10 +108,19 @@ public class MockTransportService extends TransportService { } /** - * Clears the rule associated with the provided node. + * Clears the rule associated with the provided transport service. */ - public void clearRule(DiscoveryNode node) { - transport().transports.remove(node.getAddress()); + public void clearRule(TransportService transportService) { + for (TransportAddress transportAddress : extractTransportAddresses(transportService)) { + clearRule(transportAddress); + } + } + + /** + * Clears the rule associated with the provided transport address. + */ + public void clearRule(TransportAddress transportAddress) { + transport().transports.remove(transportAddress); } /** @@ -110,8 +134,18 @@ public class MockTransportService extends TransportService { * Adds a rule that will cause every send request to fail, and each new connect since the rule * is added to fail as well. */ - public void addFailToSendNoConnectRule(DiscoveryNode node) { - addDelegate(node, new DelegateTransport(original) { + public void addFailToSendNoConnectRule(TransportService transportService) { + for (TransportAddress transportAddress : extractTransportAddresses(transportService)) { + addFailToSendNoConnectRule(transportAddress); + } + } + + /** + * Adds a rule that will cause every send request to fail, and each new connect since the rule + * is added to fail as well. + */ + public void addFailToSendNoConnectRule(TransportAddress transportAddress) { + addDelegate(transportAddress, new DelegateTransport(original) { @Override public void connectToNode(DiscoveryNode node) throws ConnectTransportException { throw new ConnectTransportException(node, "DISCONNECT: simulated"); @@ -132,16 +166,32 @@ public class MockTransportService extends TransportService { /** * Adds a rule that will cause matching operations to throw ConnectTransportExceptions */ - public void addFailToSendNoConnectRule(DiscoveryNode node, final String... blockedActions) { - addFailToSendNoConnectRule(node, new HashSet<>(Arrays.asList(blockedActions))); + public void addFailToSendNoConnectRule(TransportService transportService, final String... blockedActions) { + addFailToSendNoConnectRule(transportService, new HashSet<>(Arrays.asList(blockedActions))); } /** * Adds a rule that will cause matching operations to throw ConnectTransportExceptions */ - public void addFailToSendNoConnectRule(DiscoveryNode node, final Set blockedActions) { + public void addFailToSendNoConnectRule(TransportAddress transportAddress, final String... blockedActions) { + addFailToSendNoConnectRule(transportAddress, new HashSet<>(Arrays.asList(blockedActions))); + } - addDelegate(node, new DelegateTransport(original) { + /** + * Adds a rule that will cause matching operations to throw ConnectTransportExceptions + */ + public void addFailToSendNoConnectRule(TransportService transportService, final Set blockedActions) { + for (TransportAddress transportAddress : extractTransportAddresses(transportService)) { + addFailToSendNoConnectRule(transportAddress, blockedActions); + } + } + + /** + * Adds a rule that will cause matching operations to throw ConnectTransportExceptions + */ + public void addFailToSendNoConnectRule(TransportAddress transportAddress, final Set blockedActions) { + + addDelegate(transportAddress, new DelegateTransport(original) { @Override public void connectToNode(DiscoveryNode node) throws ConnectTransportException { original.connectToNode(node); @@ -167,8 +217,18 @@ public class MockTransportService extends TransportService { * Adds a rule that will cause ignores each send request, simulating an unresponsive node * and failing to connect once the rule was added. */ - public void addUnresponsiveRule(DiscoveryNode node) { - addDelegate(node, new DelegateTransport(original) { + public void addUnresponsiveRule(TransportService transportService) { + for (TransportAddress transportAddress : extractTransportAddresses(transportService)) { + addUnresponsiveRule(transportAddress); + } + } + + /** + * Adds a rule that will cause ignores each send request, simulating an unresponsive node + * and failing to connect once the rule was added. + */ + public void addUnresponsiveRule(TransportAddress transportAddress) { + addDelegate(transportAddress, new DelegateTransport(original) { @Override public void connectToNode(DiscoveryNode node) throws ConnectTransportException { throw new ConnectTransportException(node, "UNRESPONSIVE: simulated"); @@ -192,10 +252,22 @@ public class MockTransportService extends TransportService { * * @param duration the amount of time to delay sending and connecting. */ - public void addUnresponsiveRule(DiscoveryNode node, final TimeValue duration) { + public void addUnresponsiveRule(TransportService transportService, final TimeValue duration) { + for (TransportAddress transportAddress : extractTransportAddresses(transportService)) { + addUnresponsiveRule(transportAddress, duration); + } + } + + /** + * Adds a rule that will cause ignores each send request, simulating an unresponsive node + * and failing to connect once the rule was added. + * + * @param duration the amount of time to delay sending and connecting. + */ + public void addUnresponsiveRule(TransportAddress transportAddress, final TimeValue duration) { final long startTime = System.currentTimeMillis(); - addDelegate(node, new DelegateTransport(original) { + addDelegate(transportAddress, new DelegateTransport(original) { TimeValue getDelay() { return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime)); @@ -280,12 +352,25 @@ public class MockTransportService extends TransportService { } /** - * Adds a new delegate transport that is used for communication with the given node. + * Adds a new delegate transport that is used for communication with the given transport service. * - * @return true iff no other delegate was registered for this node before, otherwise false + * @return true iff no other delegate was registered for any of the addresses bound by transport service, otherwise false */ - public boolean addDelegate(DiscoveryNode node, DelegateTransport transport) { - return transport().transports.put(node.getAddress(), transport) == null; + public boolean addDelegate(TransportService transportService, DelegateTransport transport) { + boolean noRegistered = true; + for (TransportAddress transportAddress : extractTransportAddresses(transportService)) { + noRegistered &= addDelegate(transportAddress, transport); + } + return noRegistered; + } + + /** + * Adds a new delegate transport that is used for communication with the given transport address. + * + * @return true iff no other delegate was registered for this address before, otherwise false + */ + public boolean addDelegate(TransportAddress transportAddress, DelegateTransport transport) { + return transport().transports.put(transportAddress, transport) == null; } private LookupTestTransport transport() {