From c3110624763bcdec584269473a5dda28aa18c45e Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 2 Feb 2019 07:54:56 +0000 Subject: [PATCH] Add CoordinatorTests for empty unicast hosts list (#38209) Today we have DiscoveryDisruptionIT tests for checking that discovery can still work once the cluster has formed, even if the cluster is misconfigured and only has a single master-eligible node in its unicast hosts list. In fact with Zen2 we can go one better: we do not need any nodes in the unicast hosts list, because nodes also use the contents of the last-committed cluster state for discovery. Additionally, the DiscoveryDisruptionIT tests were failing due to the overenthusiastic fault-detection timeouts. This commit replaces these tests with deterministic `CoordinatorTests` that verify the same behaviour. It also removes some duplication by extracting a test method called `testFollowerCheckerAfterMasterReelection()` Closes #37687 --- .../cluster/coordination/Coordinator.java | 4 +- .../transport/TransportService.java | 15 +++- .../coordination/CoordinatorTests.java | 63 ++++++++++++--- .../discovery/DiscoveryDisruptionIT.java | 76 +------------------ .../discovery/StableMasterDisruptionIT.java | 69 +++++++---------- .../MockSinglePrioritizingExecutor.java | 18 +++-- .../test/InternalTestCluster.java | 10 --- .../disruption/DisruptableMockTransport.java | 2 +- 8 files changed, 107 insertions(+), 150 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 231f5555e8f..f6e2e1e9588 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -341,10 +341,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that // since we check whether a term bump is needed at the end of the publication too. if (publicationInProgress()) { - logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", - maxTermSeen, currentTerm); + logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm); } else { try { + logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, bumping term", maxTermSeen, currentTerm); ensureTermAtLeast(getLocalNode(), maxTermSeen); startElection(); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 531354b068c..3ea15bba43a 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -953,9 +953,18 @@ public class TransportService extends AbstractLifecycleComponent implements Tran responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection.getCacheKey())); // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows - getExecutorService().execute(() -> { - for (Transport.ResponseContext holderToNotify : pruned) { - holderToNotify.handler().handleException(new NodeDisconnectedException(connection.getNode(), holderToNotify.action())); + getExecutorService().execute(new Runnable() { + @Override + public void run() { + for (Transport.ResponseContext holderToNotify : pruned) { + holderToNotify.handler().handleException( + new NodeDisconnectedException(connection.getNode(), holderToNotify.action())); + } + } + + @Override + public String toString() { + return "onConnectionClosed(" + connection.getNode() + ")"; } }); } catch (EsRejectedExecutionException ex) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 93c89cfafab..333e6c5a3e7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -98,6 +99,7 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState; @@ -1046,7 +1048,7 @@ public class CoordinatorTests extends ESTestCase { Loggers.removeAppender(joinLogger, mockAppender); mockAppender.stop(); } - assertTrue(newNode.getLastAppliedClusterState().version() == 0); + assertEquals(0, newNode.getLastAppliedClusterState().version()); final ClusterNode detachedNode = newNode.restartedNode( metaData -> DetachClusterCommand.updateMetaData(metaData), @@ -1055,6 +1057,27 @@ public class CoordinatorTests extends ESTestCase { cluster1.stabilise(); } + public void testDiscoveryUsesNodesFromLastClusterState() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode partitionedNode = cluster.getAnyNode(); + if (randomBoolean()) { + logger.info("--> blackholing {}", partitionedNode); + partitionedNode.blackhole(); + } else { + logger.info("--> disconnecting {}", partitionedNode); + partitionedNode.disconnect(); + } + cluster.setEmptyUnicastHostsList(); + cluster.stabilise(); + + partitionedNode.heal(); + cluster.runRandomly(false); + cluster.stabilise(); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -1094,6 +1117,8 @@ public class CoordinatorTests extends ESTestCase { * defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING) // then wait for a follower to be promoted to leader + DEFAULT_ELECTION_DELAY + // perhaps there is an election collision requiring another publication (which times out) and a term bump + + defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY // then wait for the new leader to notice that the old leader is unresponsive + (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) * defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING) @@ -1110,14 +1135,17 @@ public class CoordinatorTests extends ESTestCase { // TODO does ThreadPool need a node name any more? Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random()); private boolean disruptStorage; + private final VotingConfiguration initialConfiguration; private final Set disconnectedNodes = new HashSet<>(); private final Set blackholedNodes = new HashSet<>(); private final Map committedStatesByVersion = new HashMap<>(); - private final Function defaultPersistedStateSupplier = - localNode -> new MockPersistedState(localNode); + private final Function defaultPersistedStateSupplier = MockPersistedState::new; + + @Nullable // null means construct a list from all the current nodes + private List unicastHostsList; Cluster(int initialNodeCount) { this(initialNodeCount, true); @@ -1177,6 +1205,10 @@ public class CoordinatorTests extends ESTestCase { } void runRandomly() { + runRandomly(true); + } + + void runRandomly(boolean allowReboots) { // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty()); @@ -1223,7 +1255,7 @@ public class CoordinatorTests extends ESTestCase { thisStep, autoShrinkVotingConfiguration, clusterNode.getId()); clusterNode.submitSetAutoShrinkVotingConfiguration(autoShrinkVotingConfiguration); }).run(); - } else if (rarely()) { + } else if (allowReboots && rarely()) { // reboot random node final ClusterNode clusterNode = getAnyNode(); logger.debug("----> [runRandomly {}] rebooting [{}]", thisStep, clusterNode.getId()); @@ -1504,6 +1536,10 @@ public class CoordinatorTests extends ESTestCase { return getAnyNode(); } + void setEmptyUnicastHostsList() { + unicastHostsList = emptyList(); + } + class MockPersistedState implements PersistedState { private final PersistedState delegate; private final NodeEnvironment nodeEnvironment; @@ -1678,13 +1714,15 @@ public class CoordinatorTests extends ESTestCase { } void close() { - logger.trace("taking down [{}]", localNode); - coordinator.stop(); - clusterService.stop(); - //transportService.stop(); // does blocking stuff :/ - clusterService.close(); - coordinator.close(); - //transportService.close(); // does blocking stuff :/ + onNode(() -> { + logger.trace("taking down [{}]", localNode); + coordinator.stop(); + clusterService.stop(); + //transportService.stop(); // does blocking stuff :/ + clusterService.close(); + coordinator.close(); + //transportService.close(); // does blocking stuff :/ + }); } ClusterNode restartedNode() { @@ -1866,7 +1904,8 @@ public class CoordinatorTests extends ESTestCase { } private List provideUnicastHosts(HostsResolver ignored) { - return clusterNodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getAddress).collect(Collectors.toList()); + return unicastHostsList != null ? unicastHostsList + : clusterNodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getAddress).collect(Collectors.toList()); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java index efe69ff99e2..4fabbe74848 100644 --- a/server/src/test/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/DiscoveryDisruptionIT.java @@ -32,7 +32,6 @@ import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect; -import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.disruption.SlowClusterStateProcessing; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -40,10 +39,8 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -58,77 +55,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase { - public void testIsolatedUnicastNodes() throws Exception { - internalCluster().setHostsListContainsOnlyFirstNode(true); - List nodes = startCluster(4); - // Figure out what is the elected master node - final String unicastTarget = nodes.get(0); - - Set unicastTargetSide = new HashSet<>(); - unicastTargetSide.add(unicastTarget); - - Set restOfClusterSide = new HashSet<>(); - restOfClusterSide.addAll(nodes); - restOfClusterSide.remove(unicastTarget); - - // Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list - // includes all the other nodes that have pinged it and the issue doesn't manifest - clearTemporalResponses(); - - // Simulate a network issue between the unicast target node and the rest of the cluster - NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(unicastTargetSide, restOfClusterSide), - new NetworkDisconnect()); - setDisruptionScheme(networkDisconnect); - networkDisconnect.startDisrupting(); - // Wait until elected master has removed that the unlucky node... - ensureStableCluster(3, nodes.get(1)); - - // The isolate master node must report no master, so it starts with pinging - assertNoMaster(unicastTarget); - networkDisconnect.stopDisrupting(); - // Wait until the master node sees all 3 nodes again. - ensureStableCluster(4); - } - - /** - * A 4 node cluster with m_m_n set to 3 and each node has one unicast endpoint. One node partitions from the master node. - * The temporal unicast responses is empty. When partition is solved the one ping response contains a master node. - * The rejoining node should take this master node and connect. - */ - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37687") - public void testUnicastSinglePingResponseContainsMaster() throws Exception { - internalCluster().setHostsListContainsOnlyFirstNode(true); - List nodes = startCluster(4); - // Figure out what is the elected master node - final String masterNode = internalCluster().getMasterName(); - logger.info("---> legit elected master node={}", masterNode); - List otherNodes = new ArrayList<>(nodes); - otherNodes.remove(masterNode); - otherNodes.remove(nodes.get(0)); // <-- Don't isolate the node that is in the unicast endpoint for all the other nodes. - final String isolatedNode = otherNodes.get(0); - - // Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list - // includes all the other nodes that have pinged it and the issue doesn't manifest - clearTemporalResponses(); - - // Simulate a network issue between the unlucky node and elected master node in both directions. - NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(masterNode, isolatedNode), - new NetworkDisconnect()); - setDisruptionScheme(networkDisconnect); - networkDisconnect.startDisrupting(); - // Wait until elected master has removed that the unlucky node... - ensureStableCluster(3, masterNode); - - // The isolate master node must report no master, so it starts with pinging - assertNoMaster(isolatedNode); - networkDisconnect.stopDisrupting(); - // Wait until the master node sees all 4 nodes again. - ensureStableCluster(4); - // The elected master shouldn't have changed, since the isolated node never could have elected himself as - // master since m_m_n of 3 could never be satisfied. - assertMaster(masterNode, nodes); - } - /** * Test cluster join with issues in cluster state publishing * */ @@ -187,7 +113,7 @@ public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase { internalCluster().stopRandomNonMasterNode(); } - public void testClusterFormingWithASlowNode() throws Exception { + public void testClusterFormingWithASlowNode() { SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000); diff --git a/server/src/test/java/org/elasticsearch/discovery/StableMasterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/StableMasterDisruptionIT.java index b5177b1ce3e..51fef980e37 100644 --- a/server/src/test/java/org/elasticsearch/discovery/StableMasterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/StableMasterDisruptionIT.java @@ -28,7 +28,9 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect; +import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType; import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive; +import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService.TestPlugin; @@ -39,7 +41,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.Collections.singleton; import static org.hamcrest.Matchers.equalTo; @@ -87,8 +88,7 @@ public class StableMasterDisruptionIT extends ESIntegTestCase { // The unlucky node must report *no* master node, since it can't connect to master and in fact it should // continuously ping until network failures have been resolved. However // It may a take a bit before the node detects it has been cut off from the elected master - assertBusy(() -> assertNull(client(unluckyNode).admin().cluster().state( - new ClusterStateRequest().local(true)).get().getState().nodes().getMasterNode())); + ensureNoMaster(unluckyNode); networkDisconnect.stopDisrupting(); @@ -99,48 +99,32 @@ public class StableMasterDisruptionIT extends ESIntegTestCase { assertThat(internalCluster().getMasterName(), equalTo(masterNode)); } - /** - * Verify that nodes fault detection works after master (re) election - */ - public void testFollowerCheckerDetectsUnresponsiveNodeAfterMasterReelection() throws Exception { - internalCluster().startNodes(4, - Settings.builder() - .put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s") - .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), "10") - .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "1s") - .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1).build()); - ensureStableCluster(4); - - logger.info("--> stopping current master"); - internalCluster().stopCurrentMasterNode(); - - ensureStableCluster(3); - - final String master = internalCluster().getMasterName(); - final List nonMasters = Arrays.stream(internalCluster().getNodeNames()).filter(n -> master.equals(n) == false) - .collect(Collectors.toList()); - final String isolatedNode = randomFrom(nonMasters); - final String otherNode = nonMasters.get(nonMasters.get(0).equals(isolatedNode) ? 1 : 0); - - logger.info("--> isolating [{}]", isolatedNode); - - final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions( - singleton(isolatedNode), Sets.newHashSet(master, otherNode)), new NetworkUnresponsive()); - setDisruptionScheme(networkDisruption); - networkDisruption.startDisrupting(); - - logger.info("--> waiting for master to remove it"); - ensureStableCluster(2, master); - - networkDisruption.stopDisrupting(); - ensureStableCluster(3); + private void ensureNoMaster(String node) throws Exception { + assertBusy(() -> assertNull(client(node).admin().cluster().state( + new ClusterStateRequest().local(true)).get().getState().nodes().getMasterNode())); } /** - * Verify that nodes fault detection works after master (re) election + * Verify that nodes fault detection detects a disconnected node after master reelection */ public void testFollowerCheckerDetectsDisconnectedNodeAfterMasterReelection() throws Exception { - internalCluster().startNodes(4); + testFollowerCheckerAfterMasterReelection(new NetworkDisconnect(), Settings.EMPTY); + } + + /** + * Verify that nodes fault detection detects an unresponsive node after master reelection + */ + public void testFollowerCheckerDetectsUnresponsiveNodeAfterMasterReelection() throws Exception { + testFollowerCheckerAfterMasterReelection(new NetworkUnresponsive(), Settings.builder() + .put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s") + .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), "4") + .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "1s") + .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1).build()); + } + + private void testFollowerCheckerAfterMasterReelection(NetworkLinkDisruptionType networkLinkDisruptionType, + Settings settings) throws Exception { + internalCluster().startNodes(4, settings); ensureStableCluster(4); logger.info("--> stopping current master"); @@ -156,13 +140,14 @@ public class StableMasterDisruptionIT extends ESIntegTestCase { logger.info("--> isolating [{}]", isolatedNode); - final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions( - singleton(isolatedNode), Stream.of(master, otherNode).collect(Collectors.toSet())), new NetworkDisconnect()); + final NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions( + singleton(isolatedNode), Sets.newHashSet(master, otherNode)), networkLinkDisruptionType); setDisruptionScheme(networkDisruption); networkDisruption.startDisrupting(); logger.info("--> waiting for master to remove it"); ensureStableCluster(2, master); + ensureNoMaster(isolatedNode); networkDisruption.stopDisrupting(); ensureStableCluster(3); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java index bcc10f1521b..add5014e555 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java @@ -34,11 +34,19 @@ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecu r -> new Thread() { @Override public void start() { - deterministicTaskQueue.scheduleNow(() -> { - try { - r.run(); - } catch (KillWorkerError kwe) { - // hacks everywhere + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + try { + r.run(); + } catch (KillWorkerError kwe) { + // hacks everywhere + } + } + + @Override + public String toString() { + return r.toString(); } }); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 5e75a50bef4..73cb4caa63b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -241,9 +241,6 @@ public final class InternalTestCluster extends TestCluster { private ServiceDisruptionScheme activeDisruptionScheme; private Function clientWrapper; - // If set to true only the first node in the cluster will be made a unicast node - private boolean hostsListContainsOnlyFirstNode; - private int bootstrapMasterNodeIndex = -1; public InternalTestCluster( @@ -1667,9 +1664,6 @@ public final class InternalTestCluster extends TestCluster { synchronized (discoveryFileMutex) { try { Stream unicastHosts = Stream.concat(nodes.values().stream(), newNodes.stream()); - if (hostsListContainsOnlyFirstNode) { - unicastHosts = unicastHosts.limit(1L); - } List discoveryFileContents = unicastHosts.map( nac -> nac.node.injector().getInstance(TransportService.class) ).filter(Objects::nonNull) @@ -2207,10 +2201,6 @@ public final class InternalTestCluster extends TestCluster { return filterNodes(nodes, NodeAndClient::isMasterEligible).size(); } - public void setHostsListContainsOnlyFirstNode(boolean hostsListContainsOnlyFirstNode) { - this.hostsListContainsOnlyFirstNode = hostsListContainsOnlyFirstNode; - } - public void setDisruptionScheme(ServiceDisruptionScheme scheme) { assert activeDisruptionScheme == null : "there is already and active disruption [" + activeDisruptionScheme + "]. call clearDisruptionScheme first"; diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index d750a8256b8..24cea25274f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -113,7 +113,7 @@ public abstract class DisruptableMockTransport extends MockTransport { assert destinationTransport.getLocalNode().equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself"; - execute(action, new Runnable() { + destinationTransport.execute(action, new Runnable() { @Override public void run() { switch (getConnectionStatus(destinationTransport.getLocalNode())) {