diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index bf6ed67f874..ebad1479ab2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -170,7 +170,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry, this::handlePublishRequest, this::handleApplyCommit); - this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); + this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure); this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; @@ -191,20 +191,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList()), getCurrentTerm()); } - private Runnable getOnLeaderFailure() { - return new Runnable() { - @Override - public void run() { - synchronized (mutex) { - becomeCandidate("onLeaderFailure"); - } + private void onLeaderFailure(Exception e) { + synchronized (mutex) { + if (mode != Mode.CANDIDATE) { + assert lastKnownLeader.isPresent(); + logger.info(new ParameterizedMessage("master node [{}] failed, restarting discovery", lastKnownLeader.get()), e); } - - @Override - public String toString() { - return "notification of leader failure"; - } - }; + becomeCandidate("onLeaderFailure"); + } } private void removeNode(DiscoveryNode discoveryNode, String reason) { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index e1ba0365953..fe3009a2c7f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; @@ -35,6 +36,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.MasterFaultDetection; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -50,6 +52,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; /** * The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are @@ -83,13 +86,13 @@ public class LeaderChecker { private final TimeValue leaderCheckTimeout; private final int leaderCheckRetryCount; private final TransportService transportService; - private final Runnable onLeaderFailure; + private final Consumer onLeaderFailure; private AtomicReference currentChecker = new AtomicReference<>(); private volatile DiscoveryNodes discoveryNodes; - public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) { + public LeaderChecker(final Settings settings, final TransportService transportService, final Consumer onLeaderFailure) { this.settings = settings; leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings); @@ -260,16 +263,19 @@ public class LeaderChecker { } if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { - logger.debug(new ParameterizedMessage("leader [{}] disconnected, failing immediately", leader), exp); - leaderFailed(); + logger.debug(new ParameterizedMessage( + "leader [{}] disconnected during check", leader), exp); + leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp)); return; } long failureCount = failureCountSinceLastSuccess.incrementAndGet(); if (failureCount >= leaderCheckRetryCount) { - logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader [{}] has failed", - failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp); - leaderFailed(); + logger.debug(new ParameterizedMessage( + "leader [{}] has failed {} consecutive checks (limit [{}] is {}); last failure was:", + leader, failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp); + leaderFailed(new ElasticsearchException( + "node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp)); return; } @@ -285,9 +291,19 @@ public class LeaderChecker { }); } - void leaderFailed() { + void leaderFailed(Exception e) { if (isClosed.compareAndSet(false, true)) { - transportService.getThreadPool().generic().execute(onLeaderFailure); + transportService.getThreadPool().generic().execute(new Runnable() { + @Override + public void run() { + onLeaderFailure.accept(e); + } + + @Override + public String toString() { + return "notification of leader failure: " + e.getMessage(); + } + }); } else { logger.trace("already closed, not failing leader"); } @@ -295,7 +311,8 @@ public class LeaderChecker { void handleDisconnectedNode(DiscoveryNode discoveryNode) { if (discoveryNode.equals(leader)) { - leaderFailed(); + logger.debug("leader [{}] disconnected", leader); + leaderFailed(new NodeDisconnectedException(discoveryNode, "disconnected")); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java index 7f2512f97f8..4e90ae02e12 100644 --- a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java +++ b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java @@ -21,6 +21,7 @@ package org.elasticsearch.discovery; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -89,6 +90,13 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis()); // success means (amongst other things) that the cluster names match logger.trace("[{}] handshake successful: {}", this, remoteNode); + } catch (Exception e) { + // we opened a connection and successfully performed a low-level handshake, so we were definitely talking to an + // Elasticsearch node, but the high-level handshake failed indicating some kind of mismatched configurations + // (e.g. cluster name) that the user should address + logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e); + listener.onFailure(e); + return; } finally { IOUtils.closeWhileHandlingException(connection); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index a806cb84a68..ce25d24bce6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -52,9 +52,12 @@ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.matchesRegex; import static org.hamcrest.Matchers.nullValue; public class LeaderCheckerTests extends ESTestCase { @@ -146,7 +149,10 @@ public class LeaderCheckerTests extends ESTestCase { final AtomicBoolean leaderFailed = new AtomicBoolean(); final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, - () -> assertTrue(leaderFailed.compareAndSet(false, true))); + e -> { + assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks")); + assertTrue(leaderFailed.compareAndSet(false, true)); + }); logger.info("--> creating first checker"); leaderChecker.updateLeader(leader1); @@ -247,7 +253,10 @@ public class LeaderCheckerTests extends ESTestCase { final AtomicBoolean leaderFailed = new AtomicBoolean(); final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, - () -> assertTrue(leaderFailed.compareAndSet(false, true))); + e -> { + assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check"))); + assertTrue(leaderFailed.compareAndSet(false, true)); + }); leaderChecker.updateLeader(leader); { @@ -316,7 +325,7 @@ public class LeaderCheckerTests extends ESTestCase { transportService.start(); transportService.acceptIncomingRequests(); - final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, () -> fail("shouldn't be checking anything")); + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> fail("shouldn't be checking anything")); final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build();