diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 0abc9f4848c..ac9b3897c88 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1046,6 +1046,11 @@ public class ElasticsearchException extends RuntimeException implements ToXConte org.elasticsearch.indices.recovery.PeerRecoveryNotFound.class, org.elasticsearch.indices.recovery.PeerRecoveryNotFound::new, 158, + Version.V_7_9_0), + NODE_HEALTH_CHECK_FAILURE_EXCEPTION( + org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException.class, + org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException::new, + 159, Version.V_7_9_0); final Class exceptionClass; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java index ce088a101d9..504dc9f32f5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.gateway.GatewayMetaState; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; @@ -44,6 +45,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; public class ClusterFormationFailureHelper { private static final Logger logger = LogManager.getLogger(ClusterFormationFailureHelper.class); @@ -124,18 +126,24 @@ public class ClusterFormationFailureHelper { private final List foundPeers; private final long currentTerm; private final ElectionStrategy electionStrategy; + private final StatusInfo statusInfo; ClusterFormationState(Settings settings, ClusterState clusterState, List resolvedAddresses, - List foundPeers, long currentTerm, ElectionStrategy electionStrategy) { + List foundPeers, long currentTerm, ElectionStrategy electionStrategy, + StatusInfo statusInfo) { this.settings = settings; this.clusterState = clusterState; this.resolvedAddresses = resolvedAddresses; this.foundPeers = foundPeers; this.currentTerm = currentTerm; this.electionStrategy = electionStrategy; + this.statusInfo = statusInfo; } String getDescription() { + if (statusInfo.getStatus() == UNHEALTHY) { + return String.format(Locale.ROOT, "this node is unhealthy: %s", statusInfo.getInfo()); + } final List clusterStateNodes = StreamSupport.stream(clusterState.nodes().getMasterNodes().values().spliterator(), false) .map(n -> n.value.toString()).collect(Collectors.toList()); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index fbb8b64d500..ce39afa960c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -70,6 +70,8 @@ import org.elasticsearch.discovery.PeerFinder; import org.elasticsearch.discovery.SeedHostsProvider; import org.elasticsearch.discovery.SeedHostsResolver; import org.elasticsearch.discovery.zen.PendingClusterStateStats; +import org.elasticsearch.monitor.NodeHealthService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportResponse.Empty; @@ -94,6 +96,7 @@ import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ID; import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; public class Coordinator extends AbstractLifecycleComponent implements Discovery { @@ -153,6 +156,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private Optional lastJoin; private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); + private final NodeHealthService nodeHealthService; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -162,7 +166,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService, Supplier persistedStateSupplier, SeedHostsProvider seedHostsProvider, ClusterApplier clusterApplier, Collection> onJoinValidators, Random random, - RerouteService rerouteService, ElectionStrategy electionStrategy) { + RerouteService rerouteService, ElectionStrategy electionStrategy, NodeHealthService nodeHealthService) { this.settings = settings; this.transportService = transportService; this.masterService = masterService; @@ -172,7 +176,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.electionStrategy = electionStrategy; this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators, - rerouteService); + rerouteService, nodeHealthService); this.persistedStateSupplier = persistedStateSupplier; this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings); this.lastKnownLeader = Optional.empty(); @@ -182,14 +186,16 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings); this.random = random; this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool()); - this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy); + this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy, + nodeHealthService); configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider); this.peerFinder = new CoordinatorPeerFinder(settings, transportService, new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry, this::handlePublishRequest, this::handleApplyCommit); - this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure); - this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode); + this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure, nodeHealthService); + this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode, + nodeHealthService); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); @@ -202,12 +208,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery transportService::getLocalNode); this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState, transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt); + this.nodeHealthService = nodeHealthService; } private ClusterFormationState getClusterFormationState() { return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(), Stream.concat(Stream.of(getLocalNode()), StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false)) - .collect(Collectors.toList()), getCurrentTerm(), electionStrategy); + .collect(Collectors.toList()), getCurrentTerm(), electionStrategy, nodeHealthService.getHealth()); } private void onLeaderFailure(Exception e) { @@ -1230,6 +1237,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery return; } + final StatusInfo statusInfo = nodeHealthService.getHealth(); + if (statusInfo.getStatus() == UNHEALTHY) { + logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo()); + return; + } + if (prevotingRound != null) { prevotingRound.close(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index e18283f8858..6f14d72c2f8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -34,6 +34,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.discovery.zen.NodesFaultDetection; +import org.elasticsearch.monitor.NodeHealthService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.Transport; @@ -57,6 +59,7 @@ import java.util.function.Consumer; import java.util.function.Predicate; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; /** * The FollowersChecker is responsible for allowing a leader to check that its followers are still connected and healthy. On deciding that a @@ -97,16 +100,17 @@ public class FollowersChecker { private final Set faultyNodes = new HashSet<>(); private final TransportService transportService; - + private final NodeHealthService nodeHealthService; private volatile FastResponseState fastResponseState; public FollowersChecker(Settings settings, TransportService transportService, Consumer handleRequestAndUpdateState, - BiConsumer onNodeFailure) { + BiConsumer onNodeFailure, NodeHealthService nodeHealthService) { this.settings = settings; this.transportService = transportService; this.handleRequestAndUpdateState = handleRequestAndUpdateState; this.onNodeFailure = onNodeFailure; + this.nodeHealthService = nodeHealthService; followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings); followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings); @@ -167,8 +171,15 @@ public class FollowersChecker { } private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel transportChannel) throws IOException { - FastResponseState responder = this.fastResponseState; + final StatusInfo statusInfo = nodeHealthService.getHealth(); + if (statusInfo.getStatus() == UNHEALTHY) { + final String message + = "handleFollowerCheck: node is unhealthy [" + statusInfo.getInfo() + "], rejecting " + statusInfo.getInfo(); + logger.debug(message); + throw new NodeHealthCheckFailureException(message); + } + final FastResponseState responder = this.fastResponseState; if (responder.mode == Mode.FOLLOWER && responder.term == request.term) { logger.trace("responding to {} on fast path", request); transportChannel.sendResponse(Empty.INSTANCE); @@ -340,6 +351,9 @@ public class FollowersChecker { || exp.getCause() instanceof ConnectTransportException) { logger.debug(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp); reason = "disconnected"; + } else if (exp.getCause() instanceof NodeHealthCheckFailureException) { + logger.debug(() -> new ParameterizedMessage("{} health check failed", FollowerChecker.this), exp); + reason = "health check failed"; } else { logger.debug(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp); scheduleNextWakeUp(); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 6c35afa9e03..686888c699f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -44,6 +44,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.MembershipAction; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.monitor.NodeHealthService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportChannel; @@ -71,6 +73,8 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; + public class JoinHelper { private static final Logger logger = LogManager.getLogger(JoinHelper.class); @@ -90,6 +94,7 @@ public class JoinHelper { @Nullable // if using single-node discovery private final TimeValue joinTimeout; + private final NodeHealthService nodeHealthService; private final Set> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>()); @@ -98,9 +103,11 @@ public class JoinHelper { JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, BiConsumer joinHandler, Function joinLeaderInTerm, - Collection> joinValidators, RerouteService rerouteService) { + Collection> joinValidators, RerouteService rerouteService, + NodeHealthService nodeHealthService) { this.masterService = masterService; this.transportService = transportService; + this.nodeHealthService = nodeHealthService; this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings); this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { @@ -268,6 +275,11 @@ public class JoinHelper { public void sendJoinRequest(DiscoveryNode destination, long term, Optional optionalJoin, Runnable onCompletion) { assert destination.isMasterNode() : "trying to join master-ineligible " + destination; + final StatusInfo statusInfo = nodeHealthService.getHealth(); + if (statusInfo.getStatus() == UNHEALTHY) { + logger.debug("dropping join request to [{}]: [{}]", destination, statusInfo.getInfo()); + return; + } final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin); final Tuple dedupKey = Tuple.tuple(destination, joinRequest); if (pendingOutgoingJoins.add(dedupKey)) { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 62a90317176..29cc53a8353 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -34,6 +34,8 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.MasterFaultDetection; +import org.elasticsearch.monitor.NodeHealthService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NodeDisconnectedException; @@ -55,6 +57,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; + /** * The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are * fairly lenient, possibly allowing multiple checks to fail before considering the leader to be faulty, to allow for the leader to @@ -88,18 +92,21 @@ public class LeaderChecker { private final int leaderCheckRetryCount; private final TransportService transportService; private final Consumer onLeaderFailure; + private final NodeHealthService nodeHealthService; private AtomicReference currentChecker = new AtomicReference<>(); private volatile DiscoveryNodes discoveryNodes; - LeaderChecker(final Settings settings, final TransportService transportService, final Consumer onLeaderFailure) { + LeaderChecker(final Settings settings, final TransportService transportService, final Consumer onLeaderFailure, + NodeHealthService nodeHealthService) { this.settings = settings; leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings); leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings); this.transportService = transportService; this.onLeaderFailure = onLeaderFailure; + this.nodeHealthService = nodeHealthService; transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, false, false, LeaderCheckRequest::new, (request, channel, task) -> { @@ -169,8 +176,13 @@ public class LeaderChecker { private void handleLeaderCheck(LeaderCheckRequest request) { final DiscoveryNodes discoveryNodes = this.discoveryNodes; assert discoveryNodes != null; - - if (discoveryNodes.isLocalNodeElectedMaster() == false) { + final StatusInfo statusInfo = nodeHealthService.getHealth(); + if (statusInfo.getStatus() == UNHEALTHY) { + final String message = "rejecting leader check from [" + request.getSender() + "] " + + "since node is unhealthy [" + statusInfo.getInfo() + "]"; + logger.debug(message); + throw new NodeHealthCheckFailureException(message); + } else if (discoveryNodes.isLocalNodeElectedMaster() == false) { logger.debug("rejecting leader check on non-master {}", request); throw new CoordinationStateRejectedException( "rejecting leader check from [" + request.getSender() + "] sent to a node that is no longer the master"); @@ -266,8 +278,12 @@ public class LeaderChecker { "leader [{}] disconnected during check", leader), exp); leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp)); return; + } else if (exp.getCause() instanceof NodeHealthCheckFailureException) { + logger.debug(new ParameterizedMessage( + "leader [{}] health check failed", leader), exp); + leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp)); + return; } - long failureCount = failureCountSinceLastSuccess.incrementAndGet(); if (failureCount >= leaderCheckRetryCount) { logger.debug(new ParameterizedMessage( diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/NodeHealthCheckFailureException.java b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeHealthCheckFailureException.java new file mode 100644 index 00000000000..e0b6e1e3e55 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/NodeHealthCheckFailureException.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * This exception is thrown if the File system is reported unhealthy by @{@link org.elasticsearch.monitor.fs.FsHealthService} + * and this nodes needs to be removed from the cluster + */ + +public class NodeHealthCheckFailureException extends ElasticsearchException { + + public NodeHealthCheckFailureException(String msg, Object... args) { + super(msg, args); + } + + public NodeHealthCheckFailureException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index 0e8823d9b58..7fb41f583e7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -29,6 +29,8 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.monitor.NodeHealthService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponseHandler; @@ -40,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongConsumer; import java.util.stream.StreamSupport; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; + import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; public class PreVoteCollector { @@ -52,16 +56,18 @@ public class PreVoteCollector { private final Runnable startElection; private final LongConsumer updateMaxTermSeen; private final ElectionStrategy electionStrategy; + private NodeHealthService nodeHealthService; // Tuple for simple atomic updates. null until the first call to `update()`. private volatile Tuple state; // DiscoveryNode component is null if there is currently no known leader. PreVoteCollector(final TransportService transportService, final Runnable startElection, final LongConsumer updateMaxTermSeen, - final ElectionStrategy electionStrategy) { + final ElectionStrategy electionStrategy, NodeHealthService nodeHealthService) { this.transportService = transportService; this.startElection = startElection; this.updateMaxTermSeen = updateMaxTermSeen; this.electionStrategy = electionStrategy; + this.nodeHealthService = nodeHealthService; transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false, PreVoteRequest::new, @@ -106,6 +112,13 @@ public class PreVoteCollector { final DiscoveryNode leader = state.v1(); final PreVoteResponse response = state.v2(); + final StatusInfo statusInfo = nodeHealthService.getHealth(); + if (statusInfo.getStatus() == UNHEALTHY) { + String message = "rejecting " + request + " on unhealthy node: [" + statusInfo.getInfo() + "]"; + logger.debug(message); + throw new NodeHealthCheckFailureException(message); + } + if (leader == null) { return response; } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index a658f95d2cf..2eef552cfd4 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -97,6 +97,7 @@ import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; +import org.elasticsearch.monitor.fs.FsHealthService; import org.elasticsearch.monitor.fs.FsService; import org.elasticsearch.monitor.jvm.JvmGcMonitorService; import org.elasticsearch.monitor.jvm.JvmService; @@ -549,7 +550,10 @@ public final class ClusterSettings extends AbstractScopedSettings { HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING, HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING, DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING, - DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING))); + DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING, + FsHealthService.ENABLED_SETTING, + FsHealthService.REFRESH_INTERVAL_SETTING, + FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER, diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 091b37fe2a5..6cccf9aea3d 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.gateway.GatewayMetaState; +import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -91,7 +92,7 @@ public class DiscoveryModule { NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, List plugins, AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState, - RerouteService rerouteService) { + RerouteService rerouteService, NodeHealthService nodeHealthService) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService)); @@ -153,7 +154,7 @@ public class DiscoveryModule { settings, clusterSettings, transportService, namedWriteableRegistry, allocationService, masterService, gatewayMetaState::getPersistedState, seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, - electionStrategy); + electionStrategy, nodeHealthService); } else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) { discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, seedHostsProvider, allocationService, joinValidators, rerouteService); diff --git a/server/src/main/java/org/elasticsearch/monitor/NodeHealthService.java b/server/src/main/java/org/elasticsearch/monitor/NodeHealthService.java new file mode 100644 index 00000000000..3abb7b1c86b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/monitor/NodeHealthService.java @@ -0,0 +1,26 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.monitor; + +@FunctionalInterface +public interface NodeHealthService { + + StatusInfo getHealth(); +} diff --git a/server/src/main/java/org/elasticsearch/monitor/StatusInfo.java b/server/src/main/java/org/elasticsearch/monitor/StatusInfo.java new file mode 100644 index 00000000000..46d8cad08ae --- /dev/null +++ b/server/src/main/java/org/elasticsearch/monitor/StatusInfo.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.monitor; + +/** + * Class that represents the Health status for a node as determined by {@link NodeHealthService} and provides additional + * info explaining the reasons + */ +public class StatusInfo { + + public enum Status { HEALTHY, UNHEALTHY } + + private Status status; + private String info; + + public StatusInfo(Status status, String info) { + this.status = status; + this.info = info; + } + + public String getInfo() { + return info; + } + + public Status getStatus() { + return status; + } + + @Override + public String toString() { + return "status[" + status + "]" + ", info[" + info + "]"; + } +} diff --git a/server/src/main/java/org/elasticsearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/elasticsearch/monitor/fs/FsHealthService.java new file mode 100644 index 00000000000..ecfb71cd441 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/monitor/fs/FsHealthService.java @@ -0,0 +1,182 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.monitor.fs; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.monitor.NodeHealthService; +import org.elasticsearch.monitor.StatusInfo; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.HashSet; +import java.util.Set; +import java.util.function.LongSupplier; +import java.util.stream.Collectors; + +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; + +/** + * Runs periodically and attempts to create a temp file to see if the filesystem is writable. If not then it marks the + * path as unhealthy. + */ +public class FsHealthService extends AbstractLifecycleComponent implements NodeHealthService { + + private static final Logger logger = LogManager.getLogger(FsHealthService.class); + private final ThreadPool threadPool; + private volatile boolean enabled; + private final TimeValue refreshInterval; + private volatile TimeValue slowPathLoggingThreshold; + private final NodeEnvironment nodeEnv; + private final LongSupplier currentTimeMillisSupplier; + private volatile Scheduler.Cancellable scheduledFuture; + + @Nullable + private volatile Set unhealthyPaths; + + public static final Setting ENABLED_SETTING = + Setting.boolSetting("monitor.fs.health.enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting REFRESH_INTERVAL_SETTING = + Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(120), TimeValue.timeValueMillis(1), + Setting.Property.NodeScope); + public static final Setting SLOW_PATH_LOGGING_THRESHOLD_SETTING = + Setting.timeSetting("monitor.fs.health.slow_path_logging_threshold", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(1), + Setting.Property.NodeScope, Setting.Property.Dynamic); + + + public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) { + this.threadPool = threadPool; + this.enabled = ENABLED_SETTING.get(settings); + this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); + this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings); + this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis; + this.nodeEnv = nodeEnv; + clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold); + clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); + } + + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(new FsHealthMonitor(), refreshInterval, + ThreadPool.Names.GENERIC); + } + + @Override + protected void doStop() { + scheduledFuture.cancel(); + } + + @Override + protected void doClose() { + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public void setSlowPathLoggingThreshold(TimeValue slowPathLoggingThreshold) { + this.slowPathLoggingThreshold = slowPathLoggingThreshold; + } + + @Override + public StatusInfo getHealth() { + StatusInfo statusInfo; + Set unhealthyPaths = this.unhealthyPaths; + if (enabled == false) { + statusInfo = new StatusInfo(HEALTHY, "health check disabled"); + } else if (unhealthyPaths == null) { + statusInfo = new StatusInfo(HEALTHY, "health check passed"); + } else { + String info = "health check failed on [" + unhealthyPaths.stream() + .map(k -> k.toString()).collect(Collectors.joining(",")) + "]"; + statusInfo = new StatusInfo(UNHEALTHY, info); + } + return statusInfo; + } + + class FsHealthMonitor implements Runnable { + + private static final String TEMP_FILE_NAME = ".es_temp_file"; + private byte[] byteToWrite; + + FsHealthMonitor(){ + this.byteToWrite = UUIDs.randomBase64UUID().getBytes(StandardCharsets.UTF_8); + } + + @Override + public void run() { + try { + if (enabled) { + monitorFSHealth(); + logger.debug("health check succeeded"); + } + } catch (Exception e) { + logger.error("health check failed", e); + } + } + + private void monitorFSHealth() { + Set currentUnhealthyPaths = null; + for (Path path : nodeEnv.nodeDataPaths()) { + long executionStartTime = currentTimeMillisSupplier.getAsLong(); + try { + if (Files.exists(path)) { + Path tempDataPath = path.resolve(TEMP_FILE_NAME); + Files.deleteIfExists(tempDataPath); + try (OutputStream os = Files.newOutputStream(tempDataPath, StandardOpenOption.CREATE_NEW)) { + os.write(byteToWrite); + IOUtils.fsync(tempDataPath, false); + } + Files.delete(tempDataPath); + final long elapsedTime = currentTimeMillisSupplier.getAsLong() - executionStartTime; + if (elapsedTime > slowPathLoggingThreshold.millis()) { + logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]", + path, elapsedTime, slowPathLoggingThreshold); + } + } + } catch (Exception ex) { + logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex); + if (currentUnhealthyPaths == null) { + currentUnhealthyPaths = new HashSet<>(1); + } + currentUnhealthyPaths.add(path); + } + } + unhealthyPaths = currentUnhealthyPaths; + } + } +} + diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 31dbbd957da..2f5ea4581b5 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -123,6 +123,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.monitor.MonitorService; +import org.elasticsearch.monitor.fs.FsHealthService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksExecutor; @@ -406,6 +407,8 @@ public class Node implements Closeable { modules.add(pluginModule); } final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService); + final FsHealthService fsHealthService = new FsHealthService(settings, clusterService.getClusterSettings(), threadPool, + nodeEnvironment); ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService); modules.add(clusterModule); IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); @@ -570,7 +573,8 @@ public class Node implements Closeable { final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), - clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService); + clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService, + fsHealthService); this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptService, httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, @@ -653,6 +657,7 @@ public class Node implements Closeable { b.bind(RestoreService.class).toInstance(restoreService); b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); + b.bind(FsHealthService.class).toInstance(fsHealthService); } ); injector = modules.createInjector(); @@ -747,6 +752,7 @@ public class Node implements Closeable { injector.getInstance(SnapshotShardsService.class).start(); injector.getInstance(RepositoriesService.class).start(); injector.getInstance(SearchService.class).start(); + injector.getInstance(FsHealthService.class).start(); nodeService.getMonitorService().start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); @@ -883,6 +889,7 @@ public class Node implements Closeable { // we close indices first, so operations won't be allowed on it injector.getInstance(ClusterService.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); + injector.getInstance(FsHealthService.class).stop(); nodeService.getMonitorService().stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); @@ -939,6 +946,8 @@ public class Node implements Closeable { toClose.add(injector.getInstance(Discovery.class)); toClose.add(() -> stopWatch.stop().start("monitor")); toClose.add(nodeService.getMonitorService()); + toClose.add(() -> stopWatch.stop().start("fsHealth")); + toClose.add(injector.getInstance(FsHealthService.class)); toClose.add(() -> stopWatch.stop().start("gateway")); toClose.add(injector.getInstance(GatewayService.class)); toClose.add(() -> stopWatch.stop().start("search")); diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 39d04db52e1..cf8d04f8304 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -72,6 +72,7 @@ import org.elasticsearch.indices.InvalidIndexTemplateException; import org.elasticsearch.indices.recovery.PeerRecoveryNotFound; import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException; import org.elasticsearch.ingest.IngestProcessorException; +import org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; @@ -828,6 +829,7 @@ public class ExceptionSerializationTests extends ESTestCase { ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class); ids.put(157, IngestProcessorException.class); ids.put(158, PeerRecoveryNotFound.class); + ids.put(159, NodeHealthCheckFailureException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java index e99feaf4fa9..bd1d7eaa5c5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.gateway.GatewayMetaState; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; @@ -43,6 +44,8 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -77,7 +80,8 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(), () -> { warningCount.incrementAndGet(); - return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy); + return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")); }, deterministicTaskQueue.getThreadPool(), logLastFailedJoinAttemptWarningCount::incrementAndGet); @@ -147,19 +151,22 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { .metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(4L).build())) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy) + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")) .getDescription(), is("master not discovered yet: have discovered []; discovery will continue using [] from hosts providers " + "and [] from last-known cluster state; node term 15, last-accepted version 12 in term 4")); final TransportAddress otherAddress = buildNewFakeTransportAddress(); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 16L, electionStrategy) + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 16L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")) .getDescription(), is("master not discovered yet: have discovered []; discovery will continue using [" + otherAddress + "] from hosts providers and [] from last-known cluster state; node term 16, last-accepted version 12 in term 4")); final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 17L, electionStrategy) + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 17L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")) .getDescription(), is("master not discovered yet: have discovered [" + otherNode + "]; discovery will continue using [] from hosts providers " + "and [] from last-known cluster state; node term 17, last-accepted version 12 in term 4")); @@ -174,12 +181,33 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { .build()) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy) - .getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered yet: have discovered []; discovery will continue using [] from hosts providers " + "and [] from last-known cluster state; node term 15, last-accepted version 42 in term 0")); } + public void testDescriptionOnUnhealthyNodes() { + final DiscoveryNode dataNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .version(12L).nodes(DiscoveryNodes.builder().add(dataNode).localNodeId(dataNode.getId())).build(); + + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy, + new StatusInfo(UNHEALTHY, "unhealthy-info")) + .getDescription(), + is("this node is unhealthy: unhealthy-info")); + + final DiscoveryNode masterNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), + org.elasticsearch.common.collect.Set.of(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); + clusterState = ClusterState.builder(ClusterName.DEFAULT) + .version(12L).nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId())).build(); + + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy, + new StatusInfo(UNHEALTHY, "unhealthy-info")) + .getDescription(), + is("this node is unhealthy: unhealthy-info")); + } + public void testDescriptionBeforeBootstrapping() { final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) @@ -187,14 +215,16 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { .metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(4L).build())) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 1L, electionStrategy).getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 1L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + "[cluster.initial_master_nodes] is empty on this node: have discovered []; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 1, last-accepted version 7 in term 4")); final TransportAddress otherAddress = buildNewFakeTransportAddress(); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 2L, electionStrategy) + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 2L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")) .getDescription(), is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + "[cluster.initial_master_nodes] is empty on this node: have discovered []; " + @@ -202,7 +232,8 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { "] from last-known cluster state; node term 2, last-accepted version 7 in term 4")); final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 3L, electionStrategy) + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 3L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")) .getDescription(), is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + "[cluster.initial_master_nodes] is empty on this node: have discovered [" + otherNode + "]; " + @@ -210,7 +241,8 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { "] from last-known cluster state; node term 3, last-accepted version 7 in term 4")); assertThat(new ClusterFormationState(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), "other").build(), - clusterState, emptyList(), emptyList(), 4L, electionStrategy).getDescription(), + clusterState, emptyList(), emptyList(), 4L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " + "this node must discover master-eligible nodes [other] to bootstrap a cluster: have discovered []; " + "discovery will continue using [] from hosts providers and [" + localNode + @@ -240,31 +272,32 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { final ClusterState clusterState = state(localNode, VotingConfiguration.MUST_JOIN_ELECTED_MASTER.getNodeIds().toArray(new String[0])); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy).getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered yet and this node was detached from its previous cluster, " + "have discovered []; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); final TransportAddress otherAddress = buildNewFakeTransportAddress(); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy) - .getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered yet and this node was detached from its previous cluster, " + "have discovered []; " + "discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy) - .getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered yet and this node was detached from its previous cluster, " + "have discovered [" + otherNode + "]; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); final DiscoveryNode yetAnotherNode = new DiscoveryNode("yetAnotherNode", buildNewFakeTransportAddress(), Version.CURRENT); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy) - .getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered yet and this node was detached from its previous cluster, " + "have discovered [" + yetAnotherNode + "]; " + "discovery will continue using [] from hosts providers and [" + localNode + @@ -277,109 +310,110 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { final ClusterState clusterState = state(localNode, "otherNode"); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy).getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires a node with id [otherNode], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); final TransportAddress otherAddress = buildNewFakeTransportAddress(); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy) - .getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires a node with id [otherNode], " + "have discovered [] which is not a quorum; " + "discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy) - .getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires a node with id [otherNode], " + "have discovered [" + otherNode + "] which is a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); final DiscoveryNode yetAnotherNode = new DiscoveryNode("yetAnotherNode", buildNewFakeTransportAddress(), Version.CURRENT); - assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy) - .getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires a node with id [otherNode], " + "have discovered [" + yetAnotherNode + "] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); - assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2"), emptyList(), emptyList(), 0L, electionStrategy) - .getDescription(), + assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2"), emptyList(), emptyList(), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires two nodes with ids [n1, n2], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3"), emptyList(), emptyList(), 0L, - electionStrategy).getDescription(), + electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires at least 2 nodes with ids from [n1, n2, n3], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", BOOTSTRAP_PLACEHOLDER_PREFIX + "n3"), - emptyList(), emptyList(), 0L, electionStrategy).getDescription(), + emptyList(), emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires 2 nodes with ids [n1, n2], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4"), emptyList(), emptyList(), 0L, - electionStrategy).getDescription(), + electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", "n5"), emptyList(), emptyList(), 0L, - electionStrategy).getDescription(), + electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4, n5], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), - emptyList(), emptyList(), 0L, electionStrategy).getDescription(), + emptyList(), emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", - BOOTSTRAP_PLACEHOLDER_PREFIX + "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L, electionStrategy) - .getDescription(), + BOOTSTRAP_PLACEHOLDER_PREFIX + "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires 3 nodes with ids [n1, n2, n3], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n1"}), emptyList(), - emptyList(), 0L, electionStrategy).getDescription(), + emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires a node with id [n1], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2"}), emptyList(), - emptyList(), 0L, electionStrategy).getDescription(), + emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires a node with id [n1] and a node with id [n2], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3"}), emptyList(), - emptyList(), 0L, electionStrategy).getDescription(), + emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires a node with id [n1] and two nodes with ids [n2, n3], " + "have discovered [] which is not a quorum; " + "discovery will continue using [] from hosts providers and [" + localNode + "] from last-known cluster state; node term 0, last-accepted version 0 in term 0")); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3", "n4"}), - emptyList(), emptyList(), 0L, electionStrategy).getDescription(), + emptyList(), emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires a node with id [n1] and " + "at least 2 nodes with ids from [n2, n3, n4], " + "have discovered [] which is not a quorum; " + @@ -400,7 +434,8 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { .lastCommittedConfiguration(config(configNodeIds)).build())).build(); assertThat( - new ClusterFormationState(Settings.EMPTY, stateWithOtherNodes, emptyList(), emptyList(), 0L, electionStrategy).getDescription(), + new ClusterFormationState(Settings.EMPTY, stateWithOtherNodes, emptyList(), emptyList(), 0L, electionStrategy, + new StatusInfo(HEALTHY, "healthy-info")).getDescription(), // nodes from last-known cluster state could be in either order is(oneOf( @@ -415,7 +450,7 @@ public class ClusterFormationFailureHelperTests extends ESTestCase { "] from last-known cluster state; node term 0, last-accepted version 0 in term 0"))); assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, GatewayMetaState.STALE_STATE_CONFIG_NODE_ID), emptyList(), - emptyList(), 0L, electionStrategy).getDescription(), + emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(), is("master not discovered or elected yet, an election requires one or more nodes that have already participated as " + "master-eligible nodes in the cluster but this node was not master-eligible the last time it joined the cluster, " + "have discovered [] which is not a quorum; " + diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 2611b1b1fd4..8bf0872b94d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.test.MockLogAppender; import java.io.IOException; @@ -54,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -75,8 +77,11 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MAS import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION; import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.elasticsearch.test.NodeRoles.nonMasterNode; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -160,6 +165,77 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { } } + public void testUnhealthyNodesGetsRemoved() { + AtomicReference healthStatusInfo = new AtomicReference<>( + new StatusInfo(HEALTHY, "healthy-info")); + try (Cluster cluster = new Cluster(3)) { + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + logger.info("--> adding two new healthy nodes"); + ClusterNode newNode1 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings, + () -> healthStatusInfo.get()); + ClusterNode newNode2 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings, + () -> healthStatusInfo.get()); + cluster.clusterNodes.add(newNode1); + cluster.clusterNodes.add(newNode2); + cluster.stabilise( + // The first pinging discovers the master + defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) + // One message delay to send a join + + DEFAULT_DELAY_VARIABILITY + // Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a + // followup reconfiguration + + 2 * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + + { + assertThat(leader.coordinator.getMode(), is(Mode.LEADER)); + final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(), + equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet()))); + } + + logger.info("setting auto-shrink reconfiguration to true"); + leader.submitSetAutoShrinkVotingConfiguration(true); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertTrue(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.get(leader.getLastAppliedClusterState().metadata().settings())); + + logger.info("--> changing health of newly added nodes to unhealthy"); + healthStatusInfo.getAndSet(new StatusInfo(UNHEALTHY, "unhealthy-info")); + + cluster.stabilise(Math.max( + // Each follower may have just sent a leader check, which receives no response + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING) + // then wait for the follower to check the leader + + defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + // then wait for the exception response + + DEFAULT_DELAY_VARIABILITY, + + // ALSO the leader may have just sent a follower check, which receives no response + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) + // wait for the leader to check its followers + + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + // then wait for the exception response + + DEFAULT_DELAY_VARIABILITY) + + // FINALLY: + + // wait for the removal to be committed + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + { + final ClusterNode newLeader = cluster.getAnyLeader(); + final VotingConfiguration lastCommittedConfiguration + = newLeader.getLastAppliedClusterState().getLastCommittedConfiguration(); + assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3)); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(newNode1.getId())); + assertFalse(lastCommittedConfiguration.getNodeIds().contains(newNode2.getId())); + } + } + } + public void testNodesJoinAfterStableCluster() { try (Cluster cluster = new Cluster(randomIntBetween(1, 5))) { cluster.runRandomly(); @@ -489,6 +565,66 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { } } + public void testUnHealthyLeaderRemoved() { + AtomicReference nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(HEALTHY, "healthy-info")); + try (Cluster cluster = new Cluster(randomIntBetween(1, 3), true, Settings.EMPTY, + () -> nodeHealthServiceStatus.get())) { + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + + logger.info("--> adding three new healthy nodes"); + ClusterNode newNode1 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings, + () -> new StatusInfo(HEALTHY, "healthy-info")); + ClusterNode newNode2 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings, + () -> new StatusInfo(HEALTHY, "healthy-info")); + ClusterNode newNode3 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings, + () -> new StatusInfo(HEALTHY, "healthy-info")); + cluster.clusterNodes.add(newNode1); + cluster.clusterNodes.add(newNode2); + cluster.clusterNodes.add(newNode3); + cluster.stabilise( + // The first pinging discovers the master + defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) + // One message delay to send a join + + DEFAULT_DELAY_VARIABILITY + // Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a + // followup reconfiguration + + 3 * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + + logger.info("--> changing health status of leader {} to unhealthy", leader); + nodeHealthServiceStatus.getAndSet(new StatusInfo(UNHEALTHY, "unhealthy-info")); + + cluster.stabilise( + // first wait for all the followers to notice the leader has gone + (defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING)) + // then wait for a follower to be promoted to leader + + DEFAULT_ELECTION_DELAY + // and the first publication times out because of the unresponsive node + + defaultMillis(PUBLISH_TIMEOUT_SETTING) + // there might be a term bump causing another election + + DEFAULT_ELECTION_DELAY + + // then wait for both of: + + Math.max( + // 1. the term bumping publication to time out + defaultMillis(PUBLISH_TIMEOUT_SETTING), + // 2. the new leader to notice that the old leader is unresponsive + (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) + ) + + // then wait for the new leader to commit a state without the old leader + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + // then wait for the followup reconfiguration + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY + ); + + assertThat(cluster.getAnyLeader().getId(), anyOf(equalTo(newNode1.getId()), equalTo(newNode2.getId()), + equalTo(newNode3.getId()))); + } + } + public void testFollowerDisconnectionDetectedQuickly() { try (Cluster cluster = new Cluster(randomIntBetween(3, 5))) { cluster.runRandomly(); @@ -1024,7 +1160,8 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { final ClusterNode newNode = cluster1.new ClusterNode(nextNodeIndex.getAndIncrement(), nodeInOtherCluster.getLocalNode(), n -> cluster1.new MockPersistedState(n, nodeInOtherCluster.persistedState, - Function.identity(), Function.identity()), nodeInOtherCluster.nodeSettings); + Function.identity(), Function.identity()), nodeInOtherCluster.nodeSettings, + () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); cluster1.clusterNodes.add(newNode); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index dc763f403d2..b6e29e9550a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -29,6 +29,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; +import org.elasticsearch.monitor.NodeHealthService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -63,6 +65,8 @@ import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_C import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING; import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING; +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.contains; @@ -109,7 +113,7 @@ public class FollowersCheckerTests extends ESTestCase { assert false : fcr; }, (node, reason) -> { assert false : node; - }); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); followersChecker.setCurrentNodes(discoveryNodesHolder[0]); deterministicTaskQueue.runAllTasks(); @@ -179,7 +183,8 @@ public class FollowersCheckerTests extends ESTestCase { testBehaviourOfFailingNode(settings, () -> null, "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis() - + FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis()); + + FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis(), + () -> new StatusInfo(HEALTHY, "healthy-info")); } public void testFailsNodeThatRejectsCheck() { @@ -196,7 +201,8 @@ public class FollowersCheckerTests extends ESTestCase { throw new ElasticsearchException("simulated exception"); }, "followers check retry count exceeded", - (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()); + (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis(), + () -> new StatusInfo(HEALTHY, "healthy-info")); } public void testFailureCounterResetsOnSuccess() { @@ -229,13 +235,13 @@ public class FollowersCheckerTests extends ESTestCase { }, "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * (maxRecoveries + 1) - 1) - * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()); + * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis(), () -> new StatusInfo(HEALTHY, "healthy-info")); } public void testFailsNodeThatIsDisconnected() { testBehaviourOfFailingNode(Settings.EMPTY, () -> { throw new ConnectTransportException(null, "simulated exception"); - }, "disconnected", 0); + }, "disconnected", 0, () -> new StatusInfo(HEALTHY, "healthy-info")); } public void testFailsNodeThatDisconnects() { @@ -278,7 +284,7 @@ public class FollowersCheckerTests extends ESTestCase { }, (node, reason) -> { assertTrue(nodeFailed.compareAndSet(false, true)); assertThat(reason, equalTo("disconnected")); - }); + }, () -> new StatusInfo(HEALTHY, "healthy-info")); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); followersChecker.setCurrentNodes(discoveryNodes); @@ -290,8 +296,23 @@ public class FollowersCheckerTests extends ESTestCase { assertThat(followersChecker.getFaultyNodes(), contains(otherNode)); } + public void testFailsNodeThatIsUnhealthy() { + final Builder settingsBuilder = Settings.builder(); + if (randomBoolean()) { + settingsBuilder.put(FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), randomIntBetween(1, 10)); + } + if (randomBoolean()) { + settingsBuilder.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), randomIntBetween(100, 100000) + "ms"); + } + final Settings settings = settingsBuilder.build(); + + testBehaviourOfFailingNode(settings, () -> { + throw new NodeHealthCheckFailureException("non writable exception"); + }, "health check failed", 0, () -> new StatusInfo(HEALTHY, "healthy-info")); + } + private void testBehaviourOfFailingNode(Settings testSettings, Supplier responder, String failureReason, - long expectedFailureTime) { + long expectedFailureTime, NodeHealthService nodeHealthService) { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build(); @@ -339,7 +360,7 @@ public class FollowersCheckerTests extends ESTestCase { }, (node, reason) -> { assertTrue(nodeFailed.compareAndSet(false, true)); assertThat(reason, equalTo(failureReason)); - }); + }, nodeHealthService); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); followersChecker.setCurrentNodes(discoveryNodes); @@ -401,6 +422,71 @@ public class FollowersCheckerTests extends ESTestCase { }); } + public void testUnhealthyNodeRejectsImmediately(){ + + final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT); + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); + + final MockTransport mockTransport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + throw new AssertionError("no requests expected"); + } + }; + + final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> follower, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final AtomicBoolean calledCoordinator = new AtomicBoolean(); + final AtomicReference coordinatorException = new AtomicReference<>(); + + final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, + fcr -> { + assertTrue(calledCoordinator.compareAndSet(false, true)); + final RuntimeException exception = coordinatorException.get(); + if (exception != null) { + throw exception; + } + }, (node, reason) -> { + assert false : node; + }, () -> new StatusInfo(UNHEALTHY, "unhealthy-info")); + + final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE); + final long followerTerm = randomLongBetween(1, leaderTerm - 1); + followersChecker.updateFastResponseState(followerTerm, Mode.FOLLOWER); + final AtomicReference receivedException = new AtomicReference<>(); + transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm, leader), + new TransportResponseHandler() { + @Override + public TransportResponse.Empty read(StreamInput in) { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse.Empty response) { + fail("unexpected success"); + } + + @Override + public void handleException(TransportException exp) { + assertThat(exp, not(nullValue())); + assertTrue(receivedException.compareAndSet(null, exp)); + } + + @Override + public String executor() { + return Names.SAME; + } + }); + deterministicTaskQueue.runAllTasks(); + assertFalse(calledCoordinator.get()); + assertThat(receivedException.get(), not(nullValue())); + } + public void testResponder() { final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT); @@ -431,7 +517,7 @@ public class FollowersCheckerTests extends ESTestCase { } }, (node, reason) -> { assert false : node; - }); + }, () -> new StatusInfo(HEALTHY, "healthy-info")); { // Does not call into the coordinator in the normal case @@ -560,7 +646,7 @@ public class FollowersCheckerTests extends ESTestCase { assert false : fcr; }, (node, reason) -> { assert false : node; - }); + },() -> new StatusInfo(HEALTHY, "healthy-info")); followersChecker.setCurrentNodes(discoveryNodes); List followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear()) .map(cr -> cr.node).collect(Collectors.toList()); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 934f56156d4..eada91912c2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.zen.MembershipAction; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; @@ -40,7 +41,10 @@ import org.elasticsearch.transport.TransportService; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -58,7 +62,8 @@ public class JoinHelperTests extends ESTestCase { x -> localNode, null, Collections.emptySet()); JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, - Collections.emptyList(), (s, p, r) -> {}); + Collections.emptyList(), (s, p, r) -> {}, + () -> new StatusInfo(HEALTHY, "info")); transportService.start(); DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); @@ -164,7 +169,7 @@ public class JoinHelperTests extends ESTestCase { x -> localNode, null, Collections.emptySet()); new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, - Collections.emptyList(), (s, p, r) -> {}); // registers request handler + Collections.emptyList(), (s, p, r) -> {}, null); // registers request handler transportService.start(); transportService.acceptIncomingRequests(); @@ -183,4 +188,54 @@ public class JoinHelperTests extends ESTestCase { assertThat(coordinationStateRejectedException.getMessage(), containsString(localClusterState.metadata().clusterUUID())); assertThat(coordinationStateRejectedException.getMessage(), containsString(otherClusterState.metadata().clusterUUID())); } + + public void testJoinFailureOnUnhealthyNodes() { + DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + Settings.builder().put(NODE_NAME_SETTING.getKey(), "node0").build(), random()); + CapturingTransport capturingTransport = new CapturingTransport(); + DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, + deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> localNode, null, Collections.emptySet()); + AtomicReference nodeHealthServiceStatus = new AtomicReference<> + (new StatusInfo(UNHEALTHY, "unhealthy-info")); + JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null, + (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, + Collections.emptyList(), (s, p, r) -> {}, () -> nodeHealthServiceStatus.get()); + transportService.start(); + + DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT); + + assertFalse(joinHelper.isJoinPending()); + + // check that sending a join to node1 doesn't work + Optional optionalJoin1 = randomBoolean() ? Optional.empty() : + Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); + joinHelper.sendJoinRequest(node1, randomNonNegativeLong(), optionalJoin1); + CapturedRequest[] capturedRequests1 = capturingTransport.getCapturedRequestsAndClear(); + assertThat(capturedRequests1.length, equalTo(0)); + + assertFalse(joinHelper.isJoinPending()); + + // check that sending a join to node2 doesn't work + Optional optionalJoin2 = randomBoolean() ? Optional.empty() : + Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); + + transportService.start(); + joinHelper.sendJoinRequest(node2, randomNonNegativeLong(), optionalJoin2); + + CapturedRequest[] capturedRequests2 = capturingTransport.getCapturedRequestsAndClear(); + assertThat(capturedRequests2.length, equalTo(0)); + + assertFalse(joinHelper.isJoinPending()); + + nodeHealthServiceStatus.getAndSet(new StatusInfo(HEALTHY, "healthy-info")); + // check that sending another join to node1 now works again + joinHelper.sendJoinRequest(node1, 0L, optionalJoin1); + CapturedRequest[] capturedRequests1a = capturingTransport.getCapturedRequestsAndClear(); + assertThat(capturedRequests1a.length, equalTo(1)); + CapturedRequest capturedRequest1a = capturedRequests1a[0]; + assertEquals(node1, capturedRequest1a.node); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index b5681d4d486..9a99631046f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -43,12 +44,15 @@ import org.elasticsearch.transport.TransportService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_ACTION_NAME; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; @@ -152,7 +156,7 @@ public class LeaderCheckerTests extends ESTestCase { e -> { assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); logger.info("--> creating first checker"); leaderChecker.updateLeader(leader1); @@ -257,7 +261,7 @@ public class LeaderCheckerTests extends ESTestCase { e -> { assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check"))); assertTrue(leaderFailed.compareAndSet(false, true)); - }); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); leaderChecker.updateLeader(leader); { @@ -314,23 +318,112 @@ public class LeaderCheckerTests extends ESTestCase { } } + public void testFollowerFailsImmediatelyOnHealthCheckFailure() { + final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT); + + final Response[] responseHolder = new Response[]{Response.SUCCESS}; + + final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build(); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); + final MockTransport mockTransport = new MockTransport() { + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + if (action.equals(HANDSHAKE_ACTION_NAME)) { + handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT)); + return; + } + assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME)); + assertEquals(node, leader); + final Response response = responseHolder[0]; + + deterministicTaskQueue.scheduleNow(new Runnable() { + @Override + public void run() { + switch (response) { + case SUCCESS: + handleResponse(requestId, Empty.INSTANCE); + break; + case REMOTE_ERROR: + handleRemoteError(requestId, new NodeHealthCheckFailureException("simulated error")); + break; + } + } + + @Override + public String toString() { + return response + " response to request " + requestId; + } + }); + } + }; + + final TransportService transportService = mockTransport.createTransportService(settings, + deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + + final AtomicBoolean leaderFailed = new AtomicBoolean(); + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, + e -> { + assertThat(e.getMessage(), endsWith("failed health checks")); + assertTrue(leaderFailed.compareAndSet(false, true)); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")); + + leaderChecker.updateLeader(leader); + + { + while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) { + deterministicTaskQueue.runAllRunnableTasks(); + deterministicTaskQueue.advanceTime(); + } + + deterministicTaskQueue.runAllRunnableTasks(); + assertFalse(leaderFailed.get()); + + responseHolder[0] = Response.REMOTE_ERROR; + + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + + assertTrue(leaderFailed.get()); + } + } + public void testLeaderBehaviour() { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build(); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); final CapturingTransport capturingTransport = new CapturingTransport(); + AtomicReference nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(UNHEALTHY, "unhealthy-info")); final TransportService transportService = capturingTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); transportService.start(); transportService.acceptIncomingRequests(); - final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> fail("shouldn't be checking anything")); + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> fail("shouldn't be checking anything"), + () -> nodeHealthServiceStatus.get()); final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build(); + { + leaderChecker.setCurrentNodes(discoveryNodes); + + final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler(); + transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler); + deterministicTaskQueue.runAllTasks(); + + assertFalse(handler.successfulResponseReceived); + assertThat(handler.transportException.getRootCause(), instanceOf(NodeHealthCheckFailureException.class)); + NodeHealthCheckFailureException cause = (NodeHealthCheckFailureException) handler.transportException.getRootCause(); + assertThat(cause.getMessage(), equalTo("rejecting leader check from [" + otherNode + + "] since node is unhealthy [unhealthy-info]")); + } + + nodeHealthServiceStatus.getAndSet(new StatusInfo(HEALTHY, "healthy-info")); { leaderChecker.setCurrentNodes(discoveryNodes); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index b64f3046199..2b448c4369e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -38,6 +38,8 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.monitor.NodeHealthService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.node.Node; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; @@ -73,6 +75,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -121,13 +124,13 @@ public class NodeJoinTests extends ESTestCase { .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build(); } - private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) { + private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState, NodeHealthService nodeHealthService) { deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random()); final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool(); FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test", fakeThreadPool, deterministicTaskQueue::scheduleNow); - setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get()); + setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get(), nodeHealthService); fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> { coordinator.handlePublishRequest(new PublishRequest(event.state())); publishListener.onResponse(null); @@ -143,13 +146,14 @@ public class NodeJoinTests extends ESTestCase { clusterStateRef.set(event.state()); publishListener.onResponse(null); }); - setupMasterServiceAndCoordinator(term, initialState, masterService, threadPool, new Random(Randomness.get().nextLong())); + setupMasterServiceAndCoordinator(term, initialState, masterService, threadPool, new Random(Randomness.get().nextLong()), + () -> new StatusInfo(HEALTHY, "healthy-info")); masterService.setClusterStateSupplier(clusterStateRef::get); masterService.start(); } private void setupMasterServiceAndCoordinator(long term, ClusterState initialState, MasterService masterService, - ThreadPool threadPool, Random random) { + ThreadPool threadPool, Random random, NodeHealthService nodeHealthService) { if (this.masterService != null || coordinator != null) { throw new IllegalStateException("method setupMasterServiceAndCoordinator can only be called once"); } @@ -179,7 +183,7 @@ public class NodeJoinTests extends ESTestCase { () -> new InMemoryPersistedState(term, initialState), r -> emptyList(), new NoOpClusterApplier(), Collections.emptyList(), - random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE); + random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE, nodeHealthService); transportService.start(); transportService.acceptIncomingRequests(); transport = capturingTransport; @@ -270,7 +274,8 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, - VotingConfiguration.of(randomFrom(node0, node1)))); + VotingConfiguration.of(randomFrom(node0, node1))), + () -> new StatusInfo(HEALTHY, "healthy-info")); assertFalse(isLocalNodeElectedMaster()); assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId()); long newTerm = initialTerm + randomLongBetween(1, 10); @@ -290,7 +295,7 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, - VotingConfiguration.of(node1))); + VotingConfiguration.of(node1)), () -> new StatusInfo(HEALTHY, "healthy-info")); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); long higherVersion = initialVersion + randomLongBetween(1, 10); @@ -306,7 +311,7 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, - VotingConfiguration.of(node0))); + VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info")); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); long higherVersion = initialVersion + randomLongBetween(1, 10); @@ -320,7 +325,7 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, - VotingConfiguration.of(node0))); + VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info")); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); @@ -337,7 +342,7 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, - VotingConfiguration.of(node0))); + VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info")); long newTerm = initialTerm + randomLongBetween(1, 10); joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); @@ -356,7 +361,7 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, - VotingConfiguration.of(node2))); + VotingConfiguration.of(node2)), () -> new StatusInfo(HEALTHY, "healthy-info")); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, newTerm, Optional.of( @@ -383,7 +388,7 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, - VotingConfiguration.of(node0))); + VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info")); long newTerm = initialTerm + randomLongBetween(1, 10); handleStartJoinFrom(node1, newTerm); handleFollowerCheckFrom(node1, newTerm); @@ -402,7 +407,8 @@ public class NodeJoinTests extends ESTestCase { CoordinationMetadata.VotingConfigExclusion.MISSING_VALUE_MARKER, "knownNodeName"); setupFakeMasterServiceAndCoordinator(initialTerm, buildStateWithVotingConfigExclusion(initialNode, initialTerm, - initialVersion, votingConfigExclusion)); + initialVersion, votingConfigExclusion), + () -> new StatusInfo(HEALTHY, "healthy-info")); DiscoveryNode knownJoiningNode = new DiscoveryNode("knownNodeName", "newNodeId", buildNewFakeTransportAddress(), emptyMap(), singleton(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); @@ -478,7 +484,7 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, - VotingConfiguration.of(node0))); + VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info")); long newTerm = initialTerm + randomLongBetween(1, 10); handleStartJoinFrom(node1, newTerm); handleFollowerCheckFrom(node1, newTerm); @@ -494,7 +500,7 @@ public class NodeJoinTests extends ESTestCase { long initialTerm = randomLongBetween(1, 10); long initialVersion = randomLongBetween(1, 10); setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, - VotingConfiguration.of(node1))); + VotingConfiguration.of(node1)), () -> new StatusInfo(HEALTHY, "healthy-info")); long newTerm = initialTerm + randomLongBetween(1, 10); SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java index b5c415b2f73..8026dcc1477 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PreVoteCollectorTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.transport.ConnectTransportException; @@ -46,6 +47,8 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.coordination.PreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME; +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.threadpool.ThreadPool.Names.SAME; import static org.hamcrest.Matchers.equalTo; @@ -63,6 +66,7 @@ public class PreVoteCollectorTests extends ESTestCase { private Map responsesByNode = new HashMap<>(); private long currentTerm, lastAcceptedTerm, lastAcceptedVersion; private TransportService transportService; + private StatusInfo healthStatus; @Before public void createObjects() { @@ -95,6 +99,11 @@ public class PreVoteCollectorTests extends ESTestCase { } }); } + + @Override + public void handleRemoteError(long requestId, Throwable t) { + logger.warn("Remote error", t); + } }; lastAcceptedTerm = randomNonNegativeLong(); currentTerm = randomLongBetween(lastAcceptedTerm, Long.MAX_VALUE); @@ -102,6 +111,7 @@ public class PreVoteCollectorTests extends ESTestCase { localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); responsesByNode.put(localNode, new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion)); + healthStatus = new StatusInfo(HEALTHY, "healthy-info"); transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); @@ -112,7 +122,7 @@ public class PreVoteCollectorTests extends ESTestCase { assert electionOccurred == false; electionOccurred = true; }, l -> { - }, ElectionStrategy.DEFAULT_INSTANCE); + }, ElectionStrategy.DEFAULT_INSTANCE, () -> healthStatus); preVoteCollector.update(getLocalPreVoteResponse(), null); } @@ -147,6 +157,13 @@ public class PreVoteCollectorTests extends ESTestCase { assertTrue(electionOccurred); } + public void testNoElectionStartIfLocalNodeIsOnlyNodeAndUnhealthy() { + healthStatus = new StatusInfo(UNHEALTHY, "unhealthy-info"); + preVoteCollector.update(getLocalPreVoteResponse(), null); + startAndRunCollector(localNode); + assertFalse(electionOccurred); + } + public void testStartsElectionIfLocalNodeIsQuorum() { final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); responsesByNode.put(otherNode, getLocalPreVoteResponse()); @@ -169,6 +186,15 @@ public class PreVoteCollectorTests extends ESTestCase { assertFalse(electionOccurred); } + public void testUnhealthyNodeDoesNotOfferPreVote() { + final long term = randomNonNegativeLong(); + healthStatus = new StatusInfo(UNHEALTHY, "unhealthy-info"); + final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); + RemoteTransportException remoteTransportException = expectThrows(RemoteTransportException.class, () -> + handlePreVoteRequestViaTransportService(new PreVoteRequest(otherNode, term))); + assertThat(remoteTransportException.getCause(), instanceOf(NodeHealthCheckFailureException.class)); + } + public void testDoesNotStartElectionIfStopped() { final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); responsesByNode.put(otherNode, getLocalPreVoteResponse()); diff --git a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index b705c53d5ab..ea7324f03e2 100644 --- a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -92,7 +92,7 @@ public class DiscoveryModuleTests extends ESTestCase { private DiscoveryModule newModule(Settings settings, List plugins) { return new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, null, masterService, clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState, - mock(RerouteService.class)); + mock(RerouteService.class), null); } public void testDefaults() { diff --git a/server/src/test/java/org/elasticsearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/elasticsearch/monitor/fs/FsHealthServiceTests.java new file mode 100644 index 00000000000..75c33c9cfb7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/monitor/fs/FsHealthServiceTests.java @@ -0,0 +1,345 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.monitor.fs; + + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.mockfile.FilterFileChannel; +import org.apache.lucene.mockfile.FilterFileSystemProvider; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.io.PathUtilsForTesting; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.FileChannel; +import java.nio.file.FileSystem; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttribute; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; +import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.is; + + +public class FsHealthServiceTests extends ESTestCase { + + private DeterministicTaskQueue deterministicTaskQueue; + + @Before + public void createObjects() { + Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); + deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); + } + + public void testSchedulesHealthCheckAtRefreshIntervals() throws Exception { + long refreshInterval = randomLongBetween(1000, 12000); + final Settings settings = Settings.builder().put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms").build(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + try (NodeEnvironment env = newNodeEnvironment()) { + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, deterministicTaskQueue.getThreadPool(), env); + final long startTimeMillis = deterministicTaskQueue.getCurrentTimeMillis(); + fsHealthService.doStart(); + assertFalse(deterministicTaskQueue.hasRunnableTasks()); + assertTrue(deterministicTaskQueue.hasDeferredTasks()); + int rescheduledCount = 0; + for (int i = 1; i <= randomIntBetween(5, 10); i++) { + if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } else { + assertThat(deterministicTaskQueue.getLatestDeferredExecutionTime(), is(refreshInterval * (rescheduledCount + 1))); + deterministicTaskQueue.advanceTime(); + rescheduledCount++; + } + assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, is(refreshInterval * rescheduledCount)); + } + + fsHealthService.doStop(); + deterministicTaskQueue.runAllTasksInTimeOrder(); + + assertFalse(deterministicTaskQueue.hasRunnableTasks()); + assertFalse(deterministicTaskQueue.hasDeferredTasks()); + } + } + + public void testFailsHealthOnIOException() throws IOException { + FileSystem fileSystem = PathUtils.getDefaultFileSystem(); + FileSystemIOExceptionProvider disruptFileSystemProvider = new FileSystemIOExceptionProvider(fileSystem); + fileSystem = disruptFileSystemProvider.getFileSystem(null); + PathUtilsForTesting.installMock(fileSystem); + final Settings settings = Settings.EMPTY; + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); + try (NodeEnvironment env = newNodeEnvironment()) { + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthService.new FsHealthMonitor().run(); + assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); + assertEquals("health check passed", fsHealthService.getHealth().getInfo()); + + //disrupt file system + disruptFileSystemProvider.injectIOException.set(true); + fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthService.new FsHealthMonitor().run(); + assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); + for (Path path : env.nodeDataPaths()) { + assertTrue(fsHealthService.getHealth().getInfo().contains(path.toString())); + } + assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount()); + } finally { + disruptFileSystemProvider.injectIOException.set(false); + PathUtilsForTesting.teardown(); + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + + @TestLogging(value = "org.elasticsearch.monitor.fs:WARN", reason = "to ensure that we log on hung IO at WARN level") + public void testLoggingOnHungIO() throws Exception { + long slowLogThreshold = randomLongBetween(100, 200); + final Settings settings = Settings.builder().put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(), + slowLogThreshold + "ms").build(); + FileSystem fileSystem = PathUtils.getDefaultFileSystem(); + TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); + FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, + randomLongBetween(slowLogThreshold + 1 , 400), testThreadPool); + fileSystem = disruptFileSystemProvider.getFileSystem(null); + PathUtilsForTesting.installMock(fileSystem); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + + Logger logger = LogManager.getLogger(FsHealthService.class); + Loggers.addAppender(logger, mockAppender); + try (NodeEnvironment env = newNodeEnvironment()) { + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + int counter = 0; + for(Path path : env.nodeDataPaths()){ + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test" + ++counter, + FsHealthService.class.getCanonicalName(), + Level.WARN, + "health check of [" + path + "] took [*ms] which is above the warn threshold*")); + } + + //disrupt file system + disruptFileSystemProvider.injectIOException.set(true); + fsHealthService.new FsHealthMonitor().run(); + assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount()); + assertBusy(mockAppender::assertAllExpectationsMatched); + } finally { + Loggers.removeAppender(logger, mockAppender); + mockAppender.stop(); + PathUtilsForTesting.teardown(); + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + + public void testFailsHealthOnSinglePathFsyncFailure() throws IOException { + FileSystem fileSystem = PathUtils.getDefaultFileSystem(); + FileSystemFsyncIOExceptionProvider disruptFsyncFileSystemProvider = new FileSystemFsyncIOExceptionProvider(fileSystem); + fileSystem = disruptFsyncFileSystemProvider.getFileSystem(null); + PathUtilsForTesting.installMock(fileSystem); + final Settings settings = Settings.EMPTY; + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); + try (NodeEnvironment env = newNodeEnvironment()) { + Path[] paths = env.nodeDataPaths(); + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthService.new FsHealthMonitor().run(); + assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); + assertEquals("health check passed", fsHealthService.getHealth().getInfo()); + + //disrupt file system fsync on single path + disruptFsyncFileSystemProvider.injectIOException.set(true); + String disruptedPath = randomFrom(paths).toString(); + disruptFsyncFileSystemProvider.restrictPathPrefix(disruptedPath); + fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthService.new FsHealthMonitor().run(); + assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); + assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]")); + assertEquals(1, disruptFsyncFileSystemProvider.getInjectedPathCount()); + } finally { + disruptFsyncFileSystemProvider.injectIOException.set(false); + PathUtilsForTesting.teardown(); + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + + public void testFailsHealthOnSinglePathWriteFailure() throws IOException { + FileSystem fileSystem = PathUtils.getDefaultFileSystem(); + FileSystemIOExceptionProvider disruptWritesFileSystemProvider = new FileSystemIOExceptionProvider(fileSystem); + fileSystem = disruptWritesFileSystemProvider.getFileSystem(null); + PathUtilsForTesting.installMock(fileSystem); + final Settings settings = Settings.EMPTY; + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); + try (NodeEnvironment env = newNodeEnvironment()) { + Path[] paths = env.nodeDataPaths(); + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthService.new FsHealthMonitor().run(); + assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); + assertEquals("health check passed", fsHealthService.getHealth().getInfo()); + + //disrupt file system writes on single path + disruptWritesFileSystemProvider.injectIOException.set(true); + String disruptedPath = randomFrom(paths).toString(); + disruptWritesFileSystemProvider.restrictPathPrefix(disruptedPath); + fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthService.new FsHealthMonitor().run(); + assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); + assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]")); + assertEquals(1, disruptWritesFileSystemProvider.getInjectedPathCount()); + } finally { + disruptWritesFileSystemProvider.injectIOException.set(false); + PathUtilsForTesting.teardown(); + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + + private static class FileSystemIOExceptionProvider extends FilterFileSystemProvider { + + AtomicBoolean injectIOException = new AtomicBoolean(); + AtomicInteger injectedPaths = new AtomicInteger(); + + private String pathPrefix = "/"; + + FileSystemIOExceptionProvider(FileSystem inner) { + super("disrupt_fs_health://", inner); + } + + public void restrictPathPrefix(String pathPrefix){ + this.pathPrefix = pathPrefix; + } + + public int getInjectedPathCount(){ + return injectedPaths.get(); + } + + @Override + public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException { + if (injectIOException.get()){ + if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) { + injectedPaths.incrementAndGet(); + throw new IOException("fake IOException"); + } + } + return super.newOutputStream(path, options); + } + } + + private static class FileSystemFsyncIOExceptionProvider extends FilterFileSystemProvider { + + AtomicBoolean injectIOException = new AtomicBoolean(); + AtomicInteger injectedPaths = new AtomicInteger(); + + private String pathPrefix = "/"; + + FileSystemFsyncIOExceptionProvider(FileSystem inner) { + super("disrupt_fs_health://", inner); + } + + public void restrictPathPrefix(String pathPrefix){ + this.pathPrefix = pathPrefix; + } + + public int getInjectedPathCount(){ + return injectedPaths.get(); + } + + @Override + public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { + return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { + @Override + public void force(boolean metaData) throws IOException { + if (injectIOException.get()) { + if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) { + injectedPaths.incrementAndGet(); + throw new IOException("fake IOException"); + } + } + super.force(metaData); + } + }; + } + } + + private static class FileSystemFsyncHungProvider extends FilterFileSystemProvider { + + AtomicBoolean injectIOException = new AtomicBoolean(); + AtomicInteger injectedPaths = new AtomicInteger(); + + private String pathPrefix = "/"; + private long delay; + private final ThreadPool threadPool; + + FileSystemFsyncHungProvider(FileSystem inner, long delay, ThreadPool threadPool) { + super("disrupt_fs_health://", inner); + this.delay = delay; + this.threadPool = threadPool; + } + + public int getInjectedPathCount(){ + return injectedPaths.get(); + } + + @Override + public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { + return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { + @Override + public void force(boolean metaData) throws IOException { + if (injectIOException.get()) { + if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) { + injectedPaths.incrementAndGet(); + final long startTimeMillis = threadPool.relativeTimeInMillis(); + do { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } while (threadPool.relativeTimeInMillis() <= startTimeMillis + delay); + } + } + super.force(metaData); + } + }; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index cdbc15b4448..5ca8fec8451 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -168,6 +168,7 @@ import org.elasticsearch.indices.recovery.PeerRecoverySourceService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; @@ -217,6 +218,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener; import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -1649,7 +1651,8 @@ public class SnapshotResiliencyTests extends ESTestCase { hostsResolver -> nodes.values().stream().filter(n -> n.node.isMasterNode()) .map(n -> n.node.getAddress()).collect(Collectors.toList()), clusterService.getClusterApplierService(), Collections.emptyList(), random(), - new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE); + new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE, + () -> new StatusInfo(HEALTHY, "healthy-info")); masterService.setClusterStatePublisher(coordinator); coordinator.start(); clusterService.getClusterApplierService().setNodeConnectionsService(nodeConnectionsService); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 3ccb43de640..5cac78b3dd8 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -65,6 +65,8 @@ import org.elasticsearch.gateway.ClusterStateUpdaters; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.gateway.PersistedClusterStateService; +import org.elasticsearch.monitor.NodeHealthService; +import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.disruption.DisruptableMockTransport; import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus; @@ -125,6 +127,7 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MAS import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION; import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; +import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.elasticsearch.transport.TransportSettings.CONNECT_TIMEOUT; @@ -254,6 +257,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { private final Map committedStatesByVersion = new HashMap<>(); private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker(); private final History history = new History(); + private NodeHealthService nodeHealthService; private final Function defaultPersistedStateSupplier = MockPersistedState::new; @@ -265,6 +269,11 @@ public class AbstractCoordinatorTestCase extends ESTestCase { } Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings) { + this(initialNodeCount, allNodesMasterEligible, nodeSettings, () -> new StatusInfo(HEALTHY, "healthy-info")); + } + + Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) { + this.nodeHealthService = nodeHealthService; deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); assertThat(initialNodeCount, greaterThan(0)); @@ -273,7 +282,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { clusterNodes = new ArrayList<>(initialNodeCount); for (int i = 0; i < initialNodeCount; i++) { final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), - allNodesMasterEligible || i == 0 || randomBoolean(), nodeSettings); + allNodesMasterEligible || i == 0 || randomBoolean(), nodeSettings, nodeHealthService); clusterNodes.add(clusterNode); if (clusterNode.getLocalNode().isMasterNode()) { masterEligibleNodeIds.add(clusterNode.getId()); @@ -309,7 +318,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase { final List addedNodes = new ArrayList<>(); for (int i = 0; i < newNodesCount; i++) { - final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true, Settings.EMPTY); + final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true, Settings.EMPTY, + nodeHealthService); addedNodes.add(clusterNode); } clusterNodes.addAll(addedNodes); @@ -635,7 +645,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase { private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) { return n1 == n2 || (getConnectionStatus(n1.getLocalNode(), n2.getLocalNode()) == ConnectionStatus.CONNECTED - && getConnectionStatus(n2.getLocalNode(), n1.getLocalNode()) == ConnectionStatus.CONNECTED); + && getConnectionStatus(n2.getLocalNode(), n1.getLocalNode()) == ConnectionStatus.CONNECTED) && + (n1.nodeHealthService.getHealth().getStatus() == HEALTHY && n2.nodeHealthService.getHealth().getStatus() == HEALTHY); } ClusterNode getAnyLeader() { @@ -881,14 +892,18 @@ public class AbstractCoordinatorTestCase extends ESTestCase { private ClusterService clusterService; TransportService transportService; private DisruptableMockTransport mockTransport; + private NodeHealthService nodeHealthService; List> extraJoinValidators = new ArrayList<>(); - ClusterNode(int nodeIndex, boolean masterEligible, Settings nodeSettings) { - this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier, nodeSettings); + + ClusterNode(int nodeIndex, boolean masterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) { + this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier, nodeSettings, + nodeHealthService); } ClusterNode(int nodeIndex, DiscoveryNode localNode, Function persistedStateSupplier, - Settings nodeSettings) { + Settings nodeSettings, NodeHealthService nodeHealthService) { + this.nodeHealthService = nodeHealthService; this.nodeIndex = nodeIndex; this.localNode = localNode; this.nodeSettings = nodeSettings; @@ -944,7 +959,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), allocationService, masterService, this::getPersistedState, Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {}, - getElectionStrategy()); + getElectionStrategy(), nodeHealthService); masterService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService, threadPool, null, coordinator); @@ -984,7 +999,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase { localNode.isMasterNode() && DiscoveryNode.isMasterNode(nodeSettings) ? DiscoveryNodeRole.BUILT_IN_ROLES : emptySet(), Version.CURRENT); return new ClusterNode(nodeIndex, newLocalNode, - node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm), nodeSettings); + node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm), nodeSettings, + nodeHealthService); } private CoordinationState.PersistedState getPersistedState() {