Log leader and handshake failures by default (#42342)

Today the `LeaderChecker` and `HandshakingTransportAddressConnector` do not log
anything above `DEBUG` level. However there are some situations where it is
appropriate for them to log at a higher level:

- if the low-level handshake succeeds but the high-level one fails then this
  indicates a config error that the user should resolve, and the exception
  will help them to do so.

- if leader checks fail repeatedly then we restart discovery, and the exception
  will help to determine what went wrong.

Resolves #42153
This commit is contained in:
David Turner 2019-05-30 08:02:13 +01:00
parent 24a794fd6b
commit 86b1a07887
4 changed files with 55 additions and 27 deletions

View File

@ -170,7 +170,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
@ -191,20 +191,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList()), getCurrentTerm());
}
private Runnable getOnLeaderFailure() {
return new Runnable() {
@Override
public void run() {
synchronized (mutex) {
becomeCandidate("onLeaderFailure");
}
private void onLeaderFailure(Exception e) {
synchronized (mutex) {
if (mode != Mode.CANDIDATE) {
assert lastKnownLeader.isPresent();
logger.info(new ParameterizedMessage("master node [{}] failed, restarting discovery", lastKnownLeader.get()), e);
}
@Override
public String toString() {
return "notification of leader failure";
}
};
becomeCandidate("onLeaderFailure");
}
}
private void removeNode(DiscoveryNode discoveryNode, String reason) {

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
@ -35,6 +36,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.MasterFaultDetection;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -50,6 +52,7 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
@ -83,13 +86,13 @@ public class LeaderChecker {
private final TimeValue leaderCheckTimeout;
private final int leaderCheckRetryCount;
private final TransportService transportService;
private final Runnable onLeaderFailure;
private final Consumer<Exception> onLeaderFailure;
private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();
private volatile DiscoveryNodes discoveryNodes;
public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
public LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
@ -260,16 +263,19 @@ public class LeaderChecker {
}
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.debug(new ParameterizedMessage("leader [{}] disconnected, failing immediately", leader), exp);
leaderFailed();
logger.debug(new ParameterizedMessage(
"leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
return;
}
long failureCount = failureCountSinceLastSuccess.incrementAndGet();
if (failureCount >= leaderCheckRetryCount) {
logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader [{}] has failed",
failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp);
leaderFailed();
logger.debug(new ParameterizedMessage(
"leader [{}] has failed {} consecutive checks (limit [{}] is {}); last failure was:",
leader, failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp);
leaderFailed(new ElasticsearchException(
"node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp));
return;
}
@ -285,9 +291,19 @@ public class LeaderChecker {
});
}
void leaderFailed() {
void leaderFailed(Exception e) {
if (isClosed.compareAndSet(false, true)) {
transportService.getThreadPool().generic().execute(onLeaderFailure);
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
onLeaderFailure.accept(e);
}
@Override
public String toString() {
return "notification of leader failure: " + e.getMessage();
}
});
} else {
logger.trace("already closed, not failing leader");
}
@ -295,7 +311,8 @@ public class LeaderChecker {
void handleDisconnectedNode(DiscoveryNode discoveryNode) {
if (discoveryNode.equals(leader)) {
leaderFailed();
logger.debug("leader [{}] disconnected", leader);
leaderFailed(new NodeDisconnectedException(discoveryNode, "disconnected"));
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.discovery;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -89,6 +90,13 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis());
// success means (amongst other things) that the cluster names match
logger.trace("[{}] handshake successful: {}", this, remoteNode);
} catch (Exception e) {
// we opened a connection and successfully performed a low-level handshake, so we were definitely talking to an
// Elasticsearch node, but the high-level handshake failed indicating some kind of mismatched configurations
// (e.g. cluster name) that the user should address
logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e);
listener.onFailure(e);
return;
} finally {
IOUtils.closeWhileHandlingException(connection);
}

View File

@ -52,9 +52,12 @@ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.matchesRegex;
import static org.hamcrest.Matchers.nullValue;
public class LeaderCheckerTests extends ESTestCase {
@ -146,7 +149,10 @@ public class LeaderCheckerTests extends ESTestCase {
final AtomicBoolean leaderFailed = new AtomicBoolean();
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService,
() -> assertTrue(leaderFailed.compareAndSet(false, true)));
e -> {
assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks"));
assertTrue(leaderFailed.compareAndSet(false, true));
});
logger.info("--> creating first checker");
leaderChecker.updateLeader(leader1);
@ -247,7 +253,10 @@ public class LeaderCheckerTests extends ESTestCase {
final AtomicBoolean leaderFailed = new AtomicBoolean();
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService,
() -> assertTrue(leaderFailed.compareAndSet(false, true)));
e -> {
assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check")));
assertTrue(leaderFailed.compareAndSet(false, true));
});
leaderChecker.updateLeader(leader);
{
@ -316,7 +325,7 @@ public class LeaderCheckerTests extends ESTestCase {
transportService.start();
transportService.acceptIncomingRequests();
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, () -> fail("shouldn't be checking anything"));
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> fail("shouldn't be checking anything"));
final DiscoveryNodes discoveryNodes
= DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build();