[TEST] Use TransportService/TransportAddress instead of DiscoveryNode for disruption rules

The disruption rules are changed to work on all transport addresses that are bound by a node (not only publish address).
This is important as UnicastZenPing creates fake DiscoveryNode instances which match one of the bound addresses and not necessarily the publish address.

Closes #14625
Closes #14653
This commit is contained in:
Yannick Welsch 2015-11-10 14:05:33 +01:00
parent fac472f90c
commit c32de1f72b
17 changed files with 146 additions and 85 deletions

View File

@ -47,7 +47,6 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase {
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)
.put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default
.build();
internalCluster().startMasterOnlyNodesAsync(3, sharedSettings).get();

View File

@ -207,7 +207,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
final Set<String> blockedActions = newHashSet(NodesStatsAction.NAME, NodesStatsAction.NAME + "[n]", IndicesStatsAction.NAME, IndicesStatsAction.NAME + "[n]");
// drop all outgoing stats requests to force a timeout.
for (DiscoveryNode node : internalTestCluster.clusterService().state().getNodes()) {
mockTransportService.addDelegate(node, new MockTransportService.DelegateTransport(mockTransportService.original()) {
mockTransportService.addDelegate(internalTestCluster.getInstance(TransportService.class, node.getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {

View File

@ -163,9 +163,6 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("http.enabled", false) // just to make test quicker
.put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default
.put("transport.bind_host", "127.0.0.1")
.put("transport.publish_host", "127.0.0.1")
.put("gateway.local.list_timeout", "10s") // still long to induce failures but to long so test won't time out
.build();
@ -844,23 +841,26 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
TransportService masterTranspotService = internalCluster().getInstance(TransportService.class, discoveryNodes.masterNode().getName());
logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode);
MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode);
nonMasterTransportService.addFailToSendNoConnectRule(discoveryNodes.masterNode());
nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService);
assertNoMaster(nonMasterNode);
logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
TransportService localTransportService = internalCluster().getInstance(TransportService.class, discoveryNodes.localNode().getName());
if (randomBoolean()) {
masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.SEND_ACTION_NAME);
masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME);
} else {
masterTransportService.addFailToSendNoConnectRule(discoveryNodes.localNode(), PublishClusterStateAction.COMMIT_ACTION_NAME);
masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.COMMIT_ACTION_NAME);
}
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
final CountDownLatch countDownLatch = new CountDownLatch(2);
nonMasterTransportService.addDelegate(discoveryNodes.masterNode(), new MockTransportService.DelegateTransport(nonMasterTransportService.original()) {
nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
@ -873,8 +873,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
countDownLatch.await();
logger.info("waiting for cluster to reform");
masterTransportService.clearRule(discoveryNodes.localNode());
nonMasterTransportService.clearRule(discoveryNodes.masterNode());
masterTransportService.clearRule(localTransportService);
nonMasterTransportService.clearRule(localTransportService);
ensureStableCluster(2);
@ -924,9 +924,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode);
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
if (randomBoolean()) {
masterTransportService.addUnresponsiveRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode());
masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
} else {
masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(ClusterService.class, nonMasterNode).localNode());
masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
}
logger.info("waiting for [{}] to be removed from cluster", nonMasterNode);

View File

@ -476,7 +476,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
final AtomicBoolean keepFailing = new AtomicBoolean(true);
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, node1));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, node3).localNode(),
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, node3),
new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override

View File

@ -115,12 +115,12 @@ public class TransportIndexFailuresIT extends ESIntegTestCase {
logger.info("--> preventing index/replica operations");
TransportService mockTransportService = internalCluster().getInstance(TransportService.class, primaryNode);
((MockTransportService) mockTransportService).addFailToSendNoConnectRule(
internalCluster().getInstance(Discovery.class, replicaNode).localNode(),
internalCluster().getInstance(TransportService.class, replicaNode),
singleton(IndexAction.NAME + "[r]")
);
mockTransportService = internalCluster().getInstance(TransportService.class, replicaNode);
((MockTransportService) mockTransportService).addFailToSendNoConnectRule(
internalCluster().getInstance(Discovery.class, primaryNode).localNode(),
internalCluster().getInstance(TransportService.class, primaryNode),
singleton(IndexAction.NAME + "[r]")
);

View File

@ -335,7 +335,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
final CountDownLatch hasCorrupted = new CountDownLatch(1);
for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) {
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
@ -407,7 +407,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
final boolean truncate = randomBoolean();
for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) {
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {

View File

@ -83,7 +83,7 @@ public class ExceptionRetryIT extends ESIntegTestCase {
//create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry.
for (NodeStats dataNode : nodeStats.getNodes()) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) {
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {

View File

@ -580,12 +580,12 @@ public class IndexRecoveryIT extends ESIntegTestCase {
MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName);
MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
DiscoveryNode redDiscoNode = internalCluster().getInstance(ClusterService.class, redNodeName).localNode();
DiscoveryNode blueDiscoNode = internalCluster().getInstance(ClusterService.class, blueNodeName).localNode();
TransportService redTransportService = internalCluster().getInstance(TransportService.class, redNodeName);
TransportService blueTransportService = internalCluster().getInstance(TransportService.class, blueNodeName);
final CountDownLatch requestBlocked = new CountDownLatch(1);
blueMockTransportService.addDelegate(redDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, blueMockTransportService.original(), requestBlocked));
redMockTransportService.addDelegate(blueDiscoNode, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, redMockTransportService.original(), requestBlocked));
blueMockTransportService.addDelegate(redTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, blueMockTransportService.original(), requestBlocked));
redMockTransportService.addDelegate(blueTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, redMockTransportService.original(), requestBlocked));
logger.info("--> starting recovery from blue to red");
client().admin().indices().prepareUpdateSettings(indexName).setSettings(

View File

@ -195,10 +195,9 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
// add a transport delegate that will prevent the shard active request to succeed the first time after relocation has finished.
// node_1 will then wait for the next cluster state change before it tries a next attempt to delet the shard.
MockTransportService transportServiceNode_1 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_1);
String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().id();
DiscoveryNode node_2_disco = internalCluster().clusterService().state().getNodes().dataNodes().get(node_2_id);
TransportService transportServiceNode_2 = internalCluster().getInstance(TransportService.class, node_2);
final CountDownLatch shardActiveRequestSent = new CountDownLatch(1);
transportServiceNode_1.addDelegate(node_2_disco, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) {
transportServiceNode_1.addDelegate(transportServiceNode_2, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (action.equals("internal:index/shard/exists") && shardActiveRequestSent.getCount() > 0) {

View File

@ -377,7 +377,7 @@ public class RelocationIT extends ESIntegTestCase {
MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, p_node);
for (DiscoveryNode node : clusterService.state().nodes()) {
if (!node.equals(clusterService.localNode())) {
mockTransportService.addDelegate(node, new RecoveryCorruption(mockTransportService.original(), corruptionCount));
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, node.getName()), new RecoveryCorruption(mockTransportService.original(), corruptionCount));
}
}

View File

@ -121,7 +121,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
final AtomicBoolean truncate = new AtomicBoolean(true);
for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) {
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().name()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {

View File

@ -1046,7 +1046,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
});
serviceB.addFailToSendNoConnectRule(nodeA);
serviceB.addFailToSendNoConnectRule(serviceA);
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), new BaseTransportResponseHandler<StringMessageResponse>() {
@ -1104,7 +1104,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
});
serviceB.addUnresponsiveRule(nodeA);
serviceB.addUnresponsiveRule(serviceA);
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService;
@ -78,10 +77,9 @@ public class NetworkDelaysPartition extends NetworkPartition {
}
@Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2) {
transportService1.addUnresponsiveRule(node1, duration);
transportService1.addUnresponsiveRule(node2, duration);
void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
transportService1.addUnresponsiveRule(transportService1, duration);
transportService1.addUnresponsiveRule(transportService2, duration);
}
@Override

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService;
@ -46,10 +45,9 @@ public class NetworkDisconnectPartition extends NetworkPartition {
}
@Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2) {
transportService1.addFailToSendNoConnectRule(node2);
transportService2.addFailToSendNoConnectRule(node1);
void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
transportService1.addFailToSendNoConnectRule(transportService2);
transportService2.addFailToSendNoConnectRule(transportService1);
}
@Override

View File

@ -18,10 +18,8 @@
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
@ -29,7 +27,6 @@ import org.elasticsearch.transport.TransportService;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
@ -140,7 +137,6 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
@Override
public synchronized void removeFromNode(String node, InternalTestCluster cluster) {
MockTransportService transportService = (MockTransportService) cluster.getInstance(TransportService.class, node);
DiscoveryNode discoveryNode = discoveryNode(node);
Set<String> otherSideNodes;
if (nodesSideOne.contains(node)) {
otherSideNodes = nodesSideTwo;
@ -153,8 +149,7 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
}
for (String node2 : otherSideNodes) {
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
DiscoveryNode discoveryNode2 = discoveryNode(node2);
removeDisruption(discoveryNode, transportService, discoveryNode2, transportService2);
removeDisruption(transportService, transportService2);
}
}
@ -165,11 +160,6 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
protected abstract String getPartitionDescription();
protected DiscoveryNode discoveryNode(String node) {
return cluster.getInstance(Discovery.class, node).localNode();
}
@Override
public synchronized void startDisrupting() {
if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) {
@ -179,11 +169,9 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
activeDisruption = true;
for (String node1 : nodesSideOne) {
MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
DiscoveryNode discoveryNode1 = discoveryNode(node1);
for (String node2 : nodesSideTwo) {
DiscoveryNode discoveryNode2 = discoveryNode(node2);
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
applyDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2);
applyDisruption(transportService1, transportService2);
}
}
}
@ -197,24 +185,20 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
logger.info("restoring partition between nodes {} & nodes {}", nodesSideOne, nodesSideTwo);
for (String node1 : nodesSideOne) {
MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
DiscoveryNode discoveryNode1 = discoveryNode(node1);
for (String node2 : nodesSideTwo) {
DiscoveryNode discoveryNode2 = discoveryNode(node2);
MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
removeDisruption(discoveryNode1, transportService1, discoveryNode2, transportService2);
removeDisruption(transportService1, transportService2);
}
}
activeDisruption = false;
}
abstract void applyDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2);
abstract void applyDisruption(MockTransportService transportService1, MockTransportService transportService2);
protected void removeDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2) {
transportService1.clearRule(node2);
transportService2.clearRule(node1);
protected void removeDisruption(MockTransportService transportService1, MockTransportService transportService2) {
transportService1.clearRule(transportService2);
transportService2.clearRule(transportService1);
}
}

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.transport.MockTransportService;
@ -45,10 +44,9 @@ public class NetworkUnresponsivePartition extends NetworkPartition {
}
@Override
void applyDisruption(DiscoveryNode node1, MockTransportService transportService1,
DiscoveryNode node2, MockTransportService transportService2) {
transportService1.addUnresponsiveRule(node2);
transportService2.addUnresponsiveRule(node1);
void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
transportService1.addUnresponsiveRule(transportService2);
transportService2.addUnresponsiveRule(transportService1);
}
@Override

View File

@ -55,6 +55,14 @@ import java.util.concurrent.CopyOnWriteArrayList;
/**
* A mock transport service that allows to simulate different network topology failures.
* Internally it maps TransportAddress objects to rules that inject failures.
* Adding rules for a node is done by adding rules for all bound addresses of a node
* (and the publish address, if different).
* Matching requests to rules is based on the transport address associated with the
* discovery node of the request, namely by DiscoveryNode.getAddress().
* This address is usually the publish address of the node but can also be a different one
* (for example, @see org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing, which constructs
* fake DiscoveryNode instances where the publish address is one of the bound addresses).
*/
public class MockTransportService extends TransportService {
@ -82,7 +90,14 @@ public class MockTransportService extends TransportService {
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, new LookupTestTransport(transport), threadPool);
this.original = transport;
}
public static TransportAddress[] extractTransportAddresses(TransportService transportService) {
HashSet<TransportAddress> transportAddresses = new HashSet<>();
BoundTransportAddress boundTransportAddress = transportService.boundAddress();
transportAddresses.addAll(Arrays.asList(boundTransportAddress.boundAddresses()));
transportAddresses.add(boundTransportAddress.publishAddress());
return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
}
/**
@ -93,10 +108,19 @@ public class MockTransportService extends TransportService {
}
/**
* Clears the rule associated with the provided node.
* Clears the rule associated with the provided transport service.
*/
public void clearRule(DiscoveryNode node) {
transport().transports.remove(node.getAddress());
public void clearRule(TransportService transportService) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
clearRule(transportAddress);
}
}
/**
* Clears the rule associated with the provided transport address.
*/
public void clearRule(TransportAddress transportAddress) {
transport().transports.remove(transportAddress);
}
/**
@ -110,8 +134,18 @@ public class MockTransportService extends TransportService {
* Adds a rule that will cause every send request to fail, and each new connect since the rule
* is added to fail as well.
*/
public void addFailToSendNoConnectRule(DiscoveryNode node) {
addDelegate(node, new DelegateTransport(original) {
public void addFailToSendNoConnectRule(TransportService transportService) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addFailToSendNoConnectRule(transportAddress);
}
}
/**
* Adds a rule that will cause every send request to fail, and each new connect since the rule
* is added to fail as well.
*/
public void addFailToSendNoConnectRule(TransportAddress transportAddress) {
addDelegate(transportAddress, new DelegateTransport(original) {
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
throw new ConnectTransportException(node, "DISCONNECT: simulated");
@ -132,16 +166,32 @@ public class MockTransportService extends TransportService {
/**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/
public void addFailToSendNoConnectRule(DiscoveryNode node, final String... blockedActions) {
addFailToSendNoConnectRule(node, new HashSet<>(Arrays.asList(blockedActions)));
public void addFailToSendNoConnectRule(TransportService transportService, final String... blockedActions) {
addFailToSendNoConnectRule(transportService, new HashSet<>(Arrays.asList(blockedActions)));
}
/**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/
public void addFailToSendNoConnectRule(DiscoveryNode node, final Set<String> blockedActions) {
public void addFailToSendNoConnectRule(TransportAddress transportAddress, final String... blockedActions) {
addFailToSendNoConnectRule(transportAddress, new HashSet<>(Arrays.asList(blockedActions)));
}
addDelegate(node, new DelegateTransport(original) {
/**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/
public void addFailToSendNoConnectRule(TransportService transportService, final Set<String> blockedActions) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addFailToSendNoConnectRule(transportAddress, blockedActions);
}
}
/**
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/
public void addFailToSendNoConnectRule(TransportAddress transportAddress, final Set<String> blockedActions) {
addDelegate(transportAddress, new DelegateTransport(original) {
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
original.connectToNode(node);
@ -167,8 +217,18 @@ public class MockTransportService extends TransportService {
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added.
*/
public void addUnresponsiveRule(DiscoveryNode node) {
addDelegate(node, new DelegateTransport(original) {
public void addUnresponsiveRule(TransportService transportService) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addUnresponsiveRule(transportAddress);
}
}
/**
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added.
*/
public void addUnresponsiveRule(TransportAddress transportAddress) {
addDelegate(transportAddress, new DelegateTransport(original) {
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
@ -192,10 +252,22 @@ public class MockTransportService extends TransportService {
*
* @param duration the amount of time to delay sending and connecting.
*/
public void addUnresponsiveRule(DiscoveryNode node, final TimeValue duration) {
public void addUnresponsiveRule(TransportService transportService, final TimeValue duration) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addUnresponsiveRule(transportAddress, duration);
}
}
/**
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added.
*
* @param duration the amount of time to delay sending and connecting.
*/
public void addUnresponsiveRule(TransportAddress transportAddress, final TimeValue duration) {
final long startTime = System.currentTimeMillis();
addDelegate(node, new DelegateTransport(original) {
addDelegate(transportAddress, new DelegateTransport(original) {
TimeValue getDelay() {
return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime));
@ -280,12 +352,25 @@ public class MockTransportService extends TransportService {
}
/**
* Adds a new delegate transport that is used for communication with the given node.
* Adds a new delegate transport that is used for communication with the given transport service.
*
* @return <tt>true</tt> iff no other delegate was registered for this node before, otherwise <tt>false</tt>
* @return <tt>true</tt> iff no other delegate was registered for any of the addresses bound by transport service, otherwise <tt>false</tt>
*/
public boolean addDelegate(DiscoveryNode node, DelegateTransport transport) {
return transport().transports.put(node.getAddress(), transport) == null;
public boolean addDelegate(TransportService transportService, DelegateTransport transport) {
boolean noRegistered = true;
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
noRegistered &= addDelegate(transportAddress, transport);
}
return noRegistered;
}
/**
* Adds a new delegate transport that is used for communication with the given transport address.
*
* @return <tt>true</tt> iff no other delegate was registered for this address before, otherwise <tt>false</tt>
*/
public boolean addDelegate(TransportAddress transportAddress, DelegateTransport transport) {
return transport().transports.put(transportAddress, transport) == null;
}
private LookupTestTransport transport() {