diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 73feb21990e..b97a7a43147 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -229,6 +229,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery // where we would possibly have to remove the NO_MASTER_BLOCK from the applierState when turning a candidate back to follower. if (getLastAcceptedState().term() < getCurrentTerm()) { becomeFollower("onFollowerCheckRequest", followerCheckRequest.getSender()); + } else if (mode == Mode.FOLLOWER) { + logger.trace("onFollowerCheckRequest: responding successfully to {}", followerCheckRequest); + } else if (joinHelper.isJoinPending()) { + logger.trace("onFollowerCheckRequest: rejoining master, responding successfully to {}", followerCheckRequest); + } else { + logger.trace("onFollowerCheckRequest: received check from faulty master, rejecting {}", followerCheckRequest); + throw new CoordinationStateRejectedException( + "onFollowerCheckRequest: received check from faulty master, rejecting " + followerCheckRequest); } } } @@ -460,7 +468,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery // package private for tests void sendValidateJoinRequest(ClusterState stateForJoinValidation, JoinRequest joinRequest, - JoinHelper.JoinCallback joinCallback) { + JoinHelper.JoinCallback joinCallback) { // validate the join on the joining node, will throw a failure if it fails the validation joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener() { @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index ad3ab9c4147..9705a444c03 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -189,6 +189,11 @@ public class JoinHelper { }; } + boolean isJoinPending() { + // cannot use pendingOutgoingJoins.isEmpty() because it's not properly synchronized. + return pendingOutgoingJoins.iterator().hasNext(); + } + public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { sendJoinRequest(destination, optionalJoin, () -> { }); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index af8395d579a..9bb4e808d8b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -919,8 +919,9 @@ public class CoordinatorTests extends ESTestCase { nonLeader.coordinator.becomeCandidate("forced"); } logger.debug("simulate follower check coming through from {} to {}", leader.getId(), nonLeader.getId()); - nonLeader.coordinator.onFollowerCheckRequest(new FollowersChecker.FollowerCheckRequest(leader.coordinator.getCurrentTerm(), - leader.getLocalNode())); + expectThrows(CoordinationStateRejectedException.class, () -> nonLeader.coordinator.onFollowerCheckRequest( + new FollowersChecker.FollowerCheckRequest(leader.coordinator.getCurrentTerm(), leader.getLocalNode()))); + assertThat(nonLeader.coordinator.getMode(), equalTo(CANDIDATE)); }).run(); cluster.stabilise(); } @@ -1081,6 +1082,38 @@ public class CoordinatorTests extends ESTestCase { cluster.stabilise(); } + public void testFollowerRemovedIfUnableToSendRequestsToMaster() { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode otherNode = cluster.getAnyNodeExcept(leader); + + cluster.blackholeConnectionsFrom(otherNode, leader); + + cluster.runFor( + (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) + * defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING) + + (defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + DEFAULT_DELAY_VARIABILITY) + * defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING) + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, + "awaiting removal of asymmetrically-partitioned node"); + + assertThat(leader.getLastAppliedClusterState().nodes().toString(), + leader.getLastAppliedClusterState().nodes().getSize(), equalTo(2)); + + cluster.clearBlackholedConnections(); + + cluster.stabilise( + // time for the disconnected node to find the master again + defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + // time for joining + + 4 * DEFAULT_DELAY_VARIABILITY + // Then a commit of the updated cluster state + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -1143,6 +1176,7 @@ public class CoordinatorTests extends ESTestCase { private final Set disconnectedNodes = new HashSet<>(); private final Set blackholedNodes = new HashSet<>(); + private final Set> blackholedConnections = new HashSet<>(); private final Map committedStatesByVersion = new HashMap<>(); private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker(); private final History history = new History(); @@ -1510,6 +1544,8 @@ public class CoordinatorTests extends ESTestCase { connectionStatus = ConnectionStatus.BLACK_HOLE; } else if (disconnectedNodes.contains(sender.getId()) || disconnectedNodes.contains(destination.getId())) { connectionStatus = ConnectionStatus.DISCONNECTED; + } else if (blackholedConnections.contains(Tuple.tuple(sender.getId(), destination.getId()))) { + connectionStatus = ConnectionStatus.BLACK_HOLE_REQUESTS_ONLY; } else if (nodeExists(sender) && nodeExists(destination)) { connectionStatus = ConnectionStatus.CONNECTED; } else { @@ -1560,6 +1596,14 @@ public class CoordinatorTests extends ESTestCase { seedHostsList = emptyList(); } + void blackholeConnectionsFrom(ClusterNode sender, ClusterNode destination) { + blackholedConnections.add(Tuple.tuple(sender.getId(), destination.getId())); + } + + void clearBlackholedConnections() { + blackholedConnections.clear(); + } + class MockPersistedState implements PersistedState { private final PersistedState delegate; private final NodeEnvironment nodeEnvironment; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 4361660876c..97777d16b4d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -51,6 +51,8 @@ public class JoinHelperTests extends ESTestCase { DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT); + assertFalse(joinHelper.isJoinPending()); + // check that sending a join to node1 works Optional optionalJoin1 = randomBoolean() ? Optional.empty() : Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); @@ -60,6 +62,8 @@ public class JoinHelperTests extends ESTestCase { CapturedRequest capturedRequest1 = capturedRequests1[0]; assertEquals(node1, capturedRequest1.node); + assertTrue(joinHelper.isJoinPending()); + // check that sending a join to node2 works Optional optionalJoin2 = randomBoolean() ? Optional.empty() : Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); @@ -95,5 +99,12 @@ public class JoinHelperTests extends ESTestCase { assertThat(capturedRequests2a.length, equalTo(1)); CapturedRequest capturedRequest2a = capturedRequests2a[0]; assertEquals(node2, capturedRequest2a.node); + + // complete all the joins and check that isJoinPending is updated + assertTrue(joinHelper.isJoinPending()); + capturingTransport.handleRemoteError(capturedRequest2.requestId, new CoordinationStateRejectedException("dummy")); + capturingTransport.handleRemoteError(capturedRequest1a.requestId, new CoordinationStateRejectedException("dummy")); + capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy")); + assertFalse(joinHelper.isJoinPending()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index 24cea25274f..c523aa15e58 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -116,8 +116,10 @@ public abstract class DisruptableMockTransport extends MockTransport { destinationTransport.execute(action, new Runnable() { @Override public void run() { - switch (getConnectionStatus(destinationTransport.getLocalNode())) { + final ConnectionStatus connectionStatus = getConnectionStatus(destinationTransport.getLocalNode()); + switch (connectionStatus) { case BLACK_HOLE: + case BLACK_HOLE_REQUESTS_ONLY: onBlackholedDuringSend(requestId, action, destinationTransport); break; @@ -128,6 +130,9 @@ public abstract class DisruptableMockTransport extends MockTransport { case CONNECTED: onConnectedDuringSend(requestId, action, request, destinationTransport); break; + + default: + throw new AssertionError("unexpected status: " + connectionStatus); } } @@ -197,11 +202,20 @@ public abstract class DisruptableMockTransport extends MockTransport { execute(action, new Runnable() { @Override public void run() { - if (destinationTransport.getConnectionStatus(getLocalNode()) != ConnectionStatus.CONNECTED) { - logger.trace("dropping response to {}: channel is not CONNECTED", - requestDescription); - } else { - handleResponse(requestId, response); + final ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(getLocalNode()); + switch (connectionStatus) { + case CONNECTED: + case BLACK_HOLE_REQUESTS_ONLY: + handleResponse(requestId, response); + break; + + case BLACK_HOLE: + case DISCONNECTED: + logger.trace("dropping response to {}: channel is {}", requestDescription, connectionStatus); + break; + + default: + throw new AssertionError("unexpected status: " + connectionStatus); } } @@ -217,11 +231,20 @@ public abstract class DisruptableMockTransport extends MockTransport { execute(action, new Runnable() { @Override public void run() { - if (destinationTransport.getConnectionStatus(getLocalNode()) != ConnectionStatus.CONNECTED) { - logger.trace("dropping response to {}: channel is not CONNECTED", - requestDescription); - } else { - handleRemoteError(requestId, exception); + final ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(getLocalNode()); + switch (connectionStatus) { + case CONNECTED: + case BLACK_HOLE_REQUESTS_ONLY: + handleRemoteError(requestId, exception); + break; + + case BLACK_HOLE: + case DISCONNECTED: + logger.trace("dropping exception response to {}: channel is {}", requestDescription, connectionStatus); + break; + + default: + throw new AssertionError("unexpected status: " + connectionStatus); } } @@ -251,9 +274,29 @@ public abstract class DisruptableMockTransport extends MockTransport { } } + /** + * Response type from {@link DisruptableMockTransport#getConnectionStatus(DiscoveryNode)} indicating whether, and how, messages should + * be disrupted on this transport. + */ public enum ConnectionStatus { + /** + * No disruption: deliver messages normally. + */ CONNECTED, - DISCONNECTED, // network requests to or from this node throw a ConnectTransportException - BLACK_HOLE // network traffic to or from the corresponding node is silently discarded + + /** + * Simulate disconnection: inbound and outbound messages throw a {@link ConnectTransportException}. + */ + DISCONNECTED, + + /** + * Simulate a blackhole partition: inbound and outbound messages are silently discarded. + */ + BLACK_HOLE, + + /** + * Simulate an asymmetric partition: outbound messages are silently discarded, but inbound messages are delivered normally. + */ + BLACK_HOLE_REQUESTS_ONLY } } diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java index 14aa79e8795..4060b7f5cd8 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java @@ -56,29 +56,32 @@ import static org.hamcrest.Matchers.containsString; public class DisruptableMockTransportTests extends ESTestCase { - DiscoveryNode node1; - DiscoveryNode node2; + private DiscoveryNode node1; + private DiscoveryNode node2; - DisruptableMockTransport transport1; - DisruptableMockTransport transport2; + private TransportService service1; + private TransportService service2; - TransportService service1; - TransportService service2; + private DeterministicTaskQueue deterministicTaskQueue; - DeterministicTaskQueue deterministicTaskQueue; + private Set> disconnectedLinks; + private Set> blackholedLinks; + private Set> blackholedRequestLinks; - Set> disconnectedLinks; - Set> blackholedLinks; - - ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) { + private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) { Tuple link = Tuple.tuple(sender, destination); if (disconnectedLinks.contains(link)) { assert blackholedLinks.contains(link) == false; + assert blackholedRequestLinks.contains(link) == false; return ConnectionStatus.DISCONNECTED; } if (blackholedLinks.contains(link)) { + assert blackholedRequestLinks.contains(link) == false; return ConnectionStatus.BLACK_HOLE; } + if (blackholedRequestLinks.contains(link)) { + return ConnectionStatus.BLACK_HOLE_REQUESTS_ONLY; + } return ConnectionStatus.CONNECTED; } @@ -89,13 +92,14 @@ public class DisruptableMockTransportTests extends ESTestCase { disconnectedLinks = new HashSet<>(); blackholedLinks = new HashSet<>(); + blackholedRequestLinks = new HashSet<>(); List transports = new ArrayList<>(); deterministicTaskQueue = new DeterministicTaskQueue( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random()); - transport1 = new DisruptableMockTransport(node1, logger) { + final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger) { @Override protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) { return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination); @@ -112,7 +116,7 @@ public class DisruptableMockTransportTests extends ESTestCase { } }; - transport2 = new DisruptableMockTransport(node2, logger) { + final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger) { @Override protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) { return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination); @@ -144,7 +148,6 @@ public class DisruptableMockTransportTests extends ESTestCase { service2.connectToNode(node1); } - private TransportRequestHandler requestHandlerShouldNotBeCalled() { return (request, channel, task) -> { throw new AssertionError("should not be called"); @@ -293,15 +296,21 @@ public class DisruptableMockTransportTests extends ESTestCase { deterministicTaskQueue.runAllRunnableTasks(); } + public void testUnavailableOnRequestOnly() { + registerRequestHandler(service1, requestHandlerShouldNotBeCalled()); + registerRequestHandler(service2, requestHandlerShouldNotBeCalled()); + blackholedRequestLinks.add(Tuple.tuple(node1, node2)); + send(service1, node2, responseHandlerShouldNotBeCalled()); + deterministicTaskQueue.runAllRunnableTasks(); + } + public void testDisconnectedOnSuccessfulResponse() throws IOException { registerRequestHandler(service1, requestHandlerShouldNotBeCalled()); AtomicReference responseHandlerChannel = new AtomicReference<>(); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); - AtomicReference responseHandlerException = new AtomicReference<>(); send(service1, node2, responseHandlerShouldNotBeCalled()); deterministicTaskQueue.runAllRunnableTasks(); - assertNull(responseHandlerException.get()); assertNotNull(responseHandlerChannel.get()); disconnectedLinks.add(Tuple.tuple(node2, node1)); @@ -314,10 +323,8 @@ public class DisruptableMockTransportTests extends ESTestCase { AtomicReference responseHandlerChannel = new AtomicReference<>(); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); - AtomicReference responseHandlerException = new AtomicReference<>(); send(service1, node2, responseHandlerShouldNotBeCalled()); deterministicTaskQueue.runAllRunnableTasks(); - assertNull(responseHandlerException.get()); assertNotNull(responseHandlerChannel.get()); disconnectedLinks.add(Tuple.tuple(node2, node1)); @@ -330,10 +337,8 @@ public class DisruptableMockTransportTests extends ESTestCase { AtomicReference responseHandlerChannel = new AtomicReference<>(); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); - AtomicReference responseHandlerException = new AtomicReference<>(); send(service1, node2, responseHandlerShouldNotBeCalled()); deterministicTaskQueue.runAllRunnableTasks(); - assertNull(responseHandlerException.get()); assertNotNull(responseHandlerChannel.get()); blackholedLinks.add(Tuple.tuple(node2, node1)); @@ -346,10 +351,8 @@ public class DisruptableMockTransportTests extends ESTestCase { AtomicReference responseHandlerChannel = new AtomicReference<>(); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); - AtomicReference responseHandlerException = new AtomicReference<>(); send(service1, node2, responseHandlerShouldNotBeCalled()); deterministicTaskQueue.runAllRunnableTasks(); - assertNull(responseHandlerException.get()); assertNotNull(responseHandlerChannel.get()); blackholedLinks.add(Tuple.tuple(node2, node1)); @@ -357,4 +360,43 @@ public class DisruptableMockTransportTests extends ESTestCase { deterministicTaskQueue.runAllRunnableTasks(); } + public void testUnavailableOnRequestOnlyReceivesSuccessfulResponse() throws IOException { + registerRequestHandler(service1, requestHandlerShouldNotBeCalled()); + AtomicReference responseHandlerChannel = new AtomicReference<>(); + registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); + + AtomicBoolean responseHandlerCalled = new AtomicBoolean(); + send(service1, node2, responseHandlerShouldBeCalledNormally(() -> responseHandlerCalled.set(true))); + + deterministicTaskQueue.runAllTasks(); + assertNotNull(responseHandlerChannel.get()); + assertFalse(responseHandlerCalled.get()); + + blackholedRequestLinks.add(Tuple.tuple(node1, node2)); + blackholedRequestLinks.add(Tuple.tuple(node2, node1)); + responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE); + + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(responseHandlerCalled.get()); + } + + public void testUnavailableOnRequestOnlyReceivesExceptionalResponse() throws IOException { + registerRequestHandler(service1, requestHandlerShouldNotBeCalled()); + AtomicReference responseHandlerChannel = new AtomicReference<>(); + registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); + + AtomicBoolean responseHandlerCalled = new AtomicBoolean(); + send(service1, node2, responseHandlerShouldBeCalledExceptionally(e -> responseHandlerCalled.set(true))); + + deterministicTaskQueue.runAllTasks(); + assertNotNull(responseHandlerChannel.get()); + assertFalse(responseHandlerCalled.get()); + + blackholedRequestLinks.add(Tuple.tuple(node1, node2)); + blackholedRequestLinks.add(Tuple.tuple(node2, node1)); + responseHandlerChannel.get().sendResponse(new Exception()); + + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(responseHandlerCalled.get()); + } }