Remove nodes with read-only filesystems (#52680) (#59138)

Today we do not allow a node to start if its filesystem is readonly, but
it is possible for a filesystem to become readonly while the node is
running. We don't currently have any infrastructure in place to make
sure that Elasticsearch behaves well if this happens. A node that cannot
write to disk may be poisonous to the rest of the cluster.

With this commit we periodically verify that nodes' filesystems are
writable. If a node fails these writability checks then it is removed
from the cluster and prevented from re-joining until the checks start
passing again.

Closes #45286

Co-authored-by: Bukhtawar Khan <bukhtawar7152@gmail.com>
This commit is contained in:
David Turner 2020-07-07 14:00:02 +01:00 committed by GitHub
parent a8220ad51e
commit 46c8d00852
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1299 additions and 101 deletions

View File

@ -1046,6 +1046,11 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.indices.recovery.PeerRecoveryNotFound.class,
org.elasticsearch.indices.recovery.PeerRecoveryNotFound::new,
158,
Version.V_7_9_0),
NODE_HEALTH_CHECK_FAILURE_EXCEPTION(
org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException.class,
org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException::new,
159,
Version.V_7_9_0);
final Class<? extends ElasticsearchException> exceptionClass;

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
@ -44,6 +45,7 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
public class ClusterFormationFailureHelper {
private static final Logger logger = LogManager.getLogger(ClusterFormationFailureHelper.class);
@ -124,18 +126,24 @@ public class ClusterFormationFailureHelper {
private final List<DiscoveryNode> foundPeers;
private final long currentTerm;
private final ElectionStrategy electionStrategy;
private final StatusInfo statusInfo;
ClusterFormationState(Settings settings, ClusterState clusterState, List<TransportAddress> resolvedAddresses,
List<DiscoveryNode> foundPeers, long currentTerm, ElectionStrategy electionStrategy) {
List<DiscoveryNode> foundPeers, long currentTerm, ElectionStrategy electionStrategy,
StatusInfo statusInfo) {
this.settings = settings;
this.clusterState = clusterState;
this.resolvedAddresses = resolvedAddresses;
this.foundPeers = foundPeers;
this.currentTerm = currentTerm;
this.electionStrategy = electionStrategy;
this.statusInfo = statusInfo;
}
String getDescription() {
if (statusInfo.getStatus() == UNHEALTHY) {
return String.format(Locale.ROOT, "this node is unhealthy: %s", statusInfo.getInfo());
}
final List<String> clusterStateNodes = StreamSupport.stream(clusterState.nodes().getMasterNodes().values().spliterator(), false)
.map(n -> n.value.toString()).collect(Collectors.toList());

View File

@ -70,6 +70,8 @@ import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.SeedHostsProvider;
import org.elasticsearch.discovery.SeedHostsResolver;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportResponse.Empty;
@ -94,6 +96,7 @@ import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ID;
import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
public class Coordinator extends AbstractLifecycleComponent implements Discovery {
@ -153,6 +156,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Optional<Join> lastJoin;
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
@ -162,7 +166,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
RerouteService rerouteService, ElectionStrategy electionStrategy) {
RerouteService rerouteService, ElectionStrategy electionStrategy, NodeHealthService nodeHealthService) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
@ -172,7 +176,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
rerouteService);
rerouteService, nodeHealthService);
this.persistedStateSupplier = persistedStateSupplier;
this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
@ -182,14 +186,16 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy);
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy,
nodeHealthService);
configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure, nodeHealthService);
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode,
nodeHealthService);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
@ -202,12 +208,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
this.nodeHealthService = nodeHealthService;
}
private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(),
Stream.concat(Stream.of(getLocalNode()), StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false))
.collect(Collectors.toList()), getCurrentTerm(), electionStrategy);
.collect(Collectors.toList()), getCurrentTerm(), electionStrategy, nodeHealthService.getHealth());
}
private void onLeaderFailure(Exception e) {
@ -1230,6 +1237,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return;
}
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo());
return;
}
if (prevotingRound != null) {
prevotingRound.close();
}

View File

@ -34,6 +34,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.zen.NodesFaultDetection;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
@ -57,6 +59,7 @@ import java.util.function.Consumer;
import java.util.function.Predicate;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
/**
* The FollowersChecker is responsible for allowing a leader to check that its followers are still connected and healthy. On deciding that a
@ -97,16 +100,17 @@ public class FollowersChecker {
private final Set<DiscoveryNode> faultyNodes = new HashSet<>();
private final TransportService transportService;
private final NodeHealthService nodeHealthService;
private volatile FastResponseState fastResponseState;
public FollowersChecker(Settings settings, TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure) {
BiConsumer<DiscoveryNode, String> onNodeFailure, NodeHealthService nodeHealthService) {
this.settings = settings;
this.transportService = transportService;
this.handleRequestAndUpdateState = handleRequestAndUpdateState;
this.onNodeFailure = onNodeFailure;
this.nodeHealthService = nodeHealthService;
followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
@ -167,8 +171,15 @@ public class FollowersChecker {
}
private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel transportChannel) throws IOException {
FastResponseState responder = this.fastResponseState;
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
final String message
= "handleFollowerCheck: node is unhealthy [" + statusInfo.getInfo() + "], rejecting " + statusInfo.getInfo();
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
}
final FastResponseState responder = this.fastResponseState;
if (responder.mode == Mode.FOLLOWER && responder.term == request.term) {
logger.trace("responding to {} on fast path", request);
transportChannel.sendResponse(Empty.INSTANCE);
@ -340,6 +351,9 @@ public class FollowersChecker {
|| exp.getCause() instanceof ConnectTransportException) {
logger.debug(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
reason = "disconnected";
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.debug(() -> new ParameterizedMessage("{} health check failed", FollowerChecker.this), exp);
reason = "health check failed";
} else {
logger.debug(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp);
scheduleNextWakeUp();

View File

@ -44,6 +44,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.MembershipAction;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
@ -71,6 +73,8 @@ import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
public class JoinHelper {
private static final Logger logger = LogManager.getLogger(JoinHelper.class);
@ -90,6 +94,7 @@ public class JoinHelper {
@Nullable // if using single-node discovery
private final TimeValue joinTimeout;
private final NodeHealthService nodeHealthService;
private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
@ -98,9 +103,11 @@ public class JoinHelper {
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService,
NodeHealthService nodeHealthService) {
this.masterService = masterService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
@ -268,6 +275,11 @@ public class JoinHelper {
public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin, Runnable onCompletion) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
logger.debug("dropping join request to [{}]: [{}]", destination, statusInfo.getInfo());
return;
}
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {

View File

@ -34,6 +34,8 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.MasterFaultDetection;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
@ -55,6 +57,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
/**
* The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are
* fairly lenient, possibly allowing multiple checks to fail before considering the leader to be faulty, to allow for the leader to
@ -88,18 +92,21 @@ public class LeaderChecker {
private final int leaderCheckRetryCount;
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
private final NodeHealthService nodeHealthService;
private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();
private volatile DiscoveryNodes discoveryNodes;
LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure) {
LeaderChecker(final Settings settings, final TransportService transportService, final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings);
leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings);
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;
transportService.registerRequestHandler(LEADER_CHECK_ACTION_NAME, Names.SAME, false, false, LeaderCheckRequest::new,
(request, channel, task) -> {
@ -169,8 +176,13 @@ public class LeaderChecker {
private void handleLeaderCheck(LeaderCheckRequest request) {
final DiscoveryNodes discoveryNodes = this.discoveryNodes;
assert discoveryNodes != null;
if (discoveryNodes.isLocalNodeElectedMaster() == false) {
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
final String message = "rejecting leader check from [" + request.getSender() + "] " +
"since node is unhealthy [" + statusInfo.getInfo() + "]";
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
} else if (discoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("rejecting leader check on non-master {}", request);
throw new CoordinationStateRejectedException(
"rejecting leader check from [" + request.getSender() + "] sent to a node that is no longer the master");
@ -266,8 +278,12 @@ public class LeaderChecker {
"leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
return;
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.debug(new ParameterizedMessage(
"leader [{}] health check failed", leader), exp);
leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp));
return;
}
long failureCount = failureCountSinceLastSuccess.incrementAndGet();
if (failureCount >= leaderCheckRetryCount) {
logger.debug(new ParameterizedMessage(

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
/**
* This exception is thrown if the File system is reported unhealthy by @{@link org.elasticsearch.monitor.fs.FsHealthService}
* and this nodes needs to be removed from the cluster
*/
public class NodeHealthCheckFailureException extends ElasticsearchException {
public NodeHealthCheckFailureException(String msg, Object... args) {
super(msg, args);
}
public NodeHealthCheckFailureException(StreamInput in) throws IOException {
super(in);
}
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
@ -40,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import java.util.stream.StreamSupport;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
public class PreVoteCollector {
@ -52,16 +56,18 @@ public class PreVoteCollector {
private final Runnable startElection;
private final LongConsumer updateMaxTermSeen;
private final ElectionStrategy electionStrategy;
private NodeHealthService nodeHealthService;
// Tuple for simple atomic updates. null until the first call to `update()`.
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader.
PreVoteCollector(final TransportService transportService, final Runnable startElection, final LongConsumer updateMaxTermSeen,
final ElectionStrategy electionStrategy) {
final ElectionStrategy electionStrategy, NodeHealthService nodeHealthService) {
this.transportService = transportService;
this.startElection = startElection;
this.updateMaxTermSeen = updateMaxTermSeen;
this.electionStrategy = electionStrategy;
this.nodeHealthService = nodeHealthService;
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
PreVoteRequest::new,
@ -106,6 +112,13 @@ public class PreVoteCollector {
final DiscoveryNode leader = state.v1();
final PreVoteResponse response = state.v2();
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
String message = "rejecting " + request + " on unhealthy node: [" + statusInfo.getInfo() + "]";
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
}
if (leader == null) {
return response;
}

View File

@ -97,6 +97,7 @@ import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.monitor.fs.FsHealthService;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
import org.elasticsearch.monitor.jvm.JvmService;
@ -549,7 +550,10 @@ public final class ClusterSettings extends AbstractScopedSettings {
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING,
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING)));
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING,
FsHealthService.ENABLED_SETTING,
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING)));
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -91,7 +92,7 @@ public class DiscoveryModule {
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState,
RerouteService rerouteService) {
RerouteService rerouteService, NodeHealthService nodeHealthService) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));
@ -153,7 +154,7 @@ public class DiscoveryModule {
settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService, gatewayMetaState::getPersistedState,
seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService,
electionStrategy);
electionStrategy, nodeHealthService);
} else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, seedHostsProvider, allocationService, joinValidators, rerouteService);

View File

@ -0,0 +1,26 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.monitor;
@FunctionalInterface
public interface NodeHealthService {
StatusInfo getHealth();
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.monitor;
/**
* Class that represents the Health status for a node as determined by {@link NodeHealthService} and provides additional
* info explaining the reasons
*/
public class StatusInfo {
public enum Status { HEALTHY, UNHEALTHY }
private Status status;
private String info;
public StatusInfo(Status status, String info) {
this.status = status;
this.info = info;
}
public String getInfo() {
return info;
}
public Status getStatus() {
return status;
}
@Override
public String toString() {
return "status[" + status + "]" + ", info[" + info + "]";
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.monitor.fs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
/**
* Runs periodically and attempts to create a temp file to see if the filesystem is writable. If not then it marks the
* path as unhealthy.
*/
public class FsHealthService extends AbstractLifecycleComponent implements NodeHealthService {
private static final Logger logger = LogManager.getLogger(FsHealthService.class);
private final ThreadPool threadPool;
private volatile boolean enabled;
private final TimeValue refreshInterval;
private volatile TimeValue slowPathLoggingThreshold;
private final NodeEnvironment nodeEnv;
private final LongSupplier currentTimeMillisSupplier;
private volatile Scheduler.Cancellable scheduledFuture;
@Nullable
private volatile Set<Path> unhealthyPaths;
public static final Setting<Boolean> ENABLED_SETTING =
Setting.boolSetting("monitor.fs.health.enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<TimeValue> REFRESH_INTERVAL_SETTING =
Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(120), TimeValue.timeValueMillis(1),
Setting.Property.NodeScope);
public static final Setting<TimeValue> SLOW_PATH_LOGGING_THRESHOLD_SETTING =
Setting.timeSetting("monitor.fs.health.slow_path_logging_threshold", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(1),
Setting.Property.NodeScope, Setting.Property.Dynamic);
public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) {
this.threadPool = threadPool;
this.enabled = ENABLED_SETTING.get(settings);
this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings);
this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis;
this.nodeEnv = nodeEnv;
clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled);
}
@Override
protected void doStart() {
scheduledFuture = threadPool.scheduleWithFixedDelay(new FsHealthMonitor(), refreshInterval,
ThreadPool.Names.GENERIC);
}
@Override
protected void doStop() {
scheduledFuture.cancel();
}
@Override
protected void doClose() {
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public void setSlowPathLoggingThreshold(TimeValue slowPathLoggingThreshold) {
this.slowPathLoggingThreshold = slowPathLoggingThreshold;
}
@Override
public StatusInfo getHealth() {
StatusInfo statusInfo;
Set<Path> unhealthyPaths = this.unhealthyPaths;
if (enabled == false) {
statusInfo = new StatusInfo(HEALTHY, "health check disabled");
} else if (unhealthyPaths == null) {
statusInfo = new StatusInfo(HEALTHY, "health check passed");
} else {
String info = "health check failed on [" + unhealthyPaths.stream()
.map(k -> k.toString()).collect(Collectors.joining(",")) + "]";
statusInfo = new StatusInfo(UNHEALTHY, info);
}
return statusInfo;
}
class FsHealthMonitor implements Runnable {
private static final String TEMP_FILE_NAME = ".es_temp_file";
private byte[] byteToWrite;
FsHealthMonitor(){
this.byteToWrite = UUIDs.randomBase64UUID().getBytes(StandardCharsets.UTF_8);
}
@Override
public void run() {
try {
if (enabled) {
monitorFSHealth();
logger.debug("health check succeeded");
}
} catch (Exception e) {
logger.error("health check failed", e);
}
}
private void monitorFSHealth() {
Set<Path> currentUnhealthyPaths = null;
for (Path path : nodeEnv.nodeDataPaths()) {
long executionStartTime = currentTimeMillisSupplier.getAsLong();
try {
if (Files.exists(path)) {
Path tempDataPath = path.resolve(TEMP_FILE_NAME);
Files.deleteIfExists(tempDataPath);
try (OutputStream os = Files.newOutputStream(tempDataPath, StandardOpenOption.CREATE_NEW)) {
os.write(byteToWrite);
IOUtils.fsync(tempDataPath, false);
}
Files.delete(tempDataPath);
final long elapsedTime = currentTimeMillisSupplier.getAsLong() - executionStartTime;
if (elapsedTime > slowPathLoggingThreshold.millis()) {
logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]",
path, elapsedTime, slowPathLoggingThreshold);
}
}
} catch (Exception ex) {
logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex);
if (currentUnhealthyPaths == null) {
currentUnhealthyPaths = new HashSet<>(1);
}
currentUnhealthyPaths.add(path);
}
}
unhealthyPaths = currentUnhealthyPaths;
}
}
}

View File

@ -123,6 +123,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.fs.FsHealthService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
@ -406,6 +407,8 @@ public class Node implements Closeable {
modules.add(pluginModule);
}
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
final FsHealthService fsHealthService = new FsHealthService(settings, clusterService.getClusterSettings(), threadPool,
nodeEnvironment);
ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
@ -570,7 +573,8 @@ public class Node implements Closeable {
final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService);
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService,
fsHealthService);
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
@ -653,6 +657,7 @@ public class Node implements Closeable {
b.bind(RestoreService.class).toInstance(restoreService);
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
}
);
injector = modules.createInjector();
@ -747,6 +752,7 @@ public class Node implements Closeable {
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
nodeService.getMonitorService().start();
final ClusterService clusterService = injector.getInstance(ClusterService.class);
@ -883,6 +889,7 @@ public class Node implements Closeable {
// we close indices first, so operations won't be allowed on it
injector.getInstance(ClusterService.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
nodeService.getMonitorService().stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
@ -939,6 +946,8 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(Discovery.class));
toClose.add(() -> stopWatch.stop().start("monitor"));
toClose.add(nodeService.getMonitorService());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(() -> stopWatch.stop().start("search"));

View File

@ -72,6 +72,7 @@ import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.indices.recovery.PeerRecoveryNotFound;
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
import org.elasticsearch.ingest.IngestProcessorException;
import org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
@ -828,6 +829,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class);
ids.put(157, IngestProcessorException.class);
ids.put(158, PeerRecoveryNotFound.class);
ids.put(159, NodeHealthCheckFailureException.class);
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
@ -43,6 +44,8 @@ import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -77,7 +80,8 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(),
() -> {
warningCount.incrementAndGet();
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy);
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info"));
},
deterministicTaskQueue.getThreadPool(), logLastFailedJoinAttemptWarningCount::incrementAndGet);
@ -147,19 +151,22 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
.metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(4L).build()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy)
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info"))
.getDescription(),
is("master not discovered yet: have discovered []; discovery will continue using [] from hosts providers " +
"and [] from last-known cluster state; node term 15, last-accepted version 12 in term 4"));
final TransportAddress otherAddress = buildNewFakeTransportAddress();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 16L, electionStrategy)
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 16L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info"))
.getDescription(),
is("master not discovered yet: have discovered []; discovery will continue using [" + otherAddress +
"] from hosts providers and [] from last-known cluster state; node term 16, last-accepted version 12 in term 4"));
final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 17L, electionStrategy)
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 17L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info"))
.getDescription(),
is("master not discovered yet: have discovered [" + otherNode + "]; discovery will continue using [] from hosts providers " +
"and [] from last-known cluster state; node term 17, last-accepted version 12 in term 4"));
@ -174,12 +181,33 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
.build())
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered yet: have discovered []; discovery will continue using [] from hosts providers " +
"and [] from last-known cluster state; node term 15, last-accepted version 42 in term 0"));
}
public void testDescriptionOnUnhealthyNodes() {
final DiscoveryNode dataNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.version(12L).nodes(DiscoveryNodes.builder().add(dataNode).localNodeId(dataNode.getId())).build();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy,
new StatusInfo(UNHEALTHY, "unhealthy-info"))
.getDescription(),
is("this node is unhealthy: unhealthy-info"));
final DiscoveryNode masterNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(),
org.elasticsearch.common.collect.Set.of(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT);
clusterState = ClusterState.builder(ClusterName.DEFAULT)
.version(12L).nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId())).build();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 15L, electionStrategy,
new StatusInfo(UNHEALTHY, "unhealthy-info"))
.getDescription(),
is("this node is unhealthy: unhealthy-info"));
}
public void testDescriptionBeforeBootstrapping() {
final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
@ -187,14 +215,16 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
.metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(4L).build()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 1L, electionStrategy).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 1L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " +
"[cluster.initial_master_nodes] is empty on this node: have discovered []; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 1, last-accepted version 7 in term 4"));
final TransportAddress otherAddress = buildNewFakeTransportAddress();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 2L, electionStrategy)
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 2L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info"))
.getDescription(),
is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " +
"[cluster.initial_master_nodes] is empty on this node: have discovered []; " +
@ -202,7 +232,8 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
"] from last-known cluster state; node term 2, last-accepted version 7 in term 4"));
final DiscoveryNode otherNode = new DiscoveryNode("other", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 3L, electionStrategy)
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 3L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info"))
.getDescription(),
is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " +
"[cluster.initial_master_nodes] is empty on this node: have discovered [" + otherNode + "]; " +
@ -210,7 +241,8 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
"] from last-known cluster state; node term 3, last-accepted version 7 in term 4"));
assertThat(new ClusterFormationState(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), "other").build(),
clusterState, emptyList(), emptyList(), 4L, electionStrategy).getDescription(),
clusterState, emptyList(), emptyList(), 4L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered yet, this node has not previously joined a bootstrapped (v7+) cluster, and " +
"this node must discover master-eligible nodes [other] to bootstrap a cluster: have discovered []; " +
"discovery will continue using [] from hosts providers and [" + localNode +
@ -240,31 +272,32 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
final ClusterState clusterState = state(localNode,
VotingConfiguration.MUST_JOIN_ELECTED_MASTER.getNodeIds().toArray(new String[0]));
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered yet and this node was detached from its previous cluster, " +
"have discovered []; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final TransportAddress otherAddress = buildNewFakeTransportAddress();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered yet and this node was detached from its previous cluster, " +
"have discovered []; " +
"discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered yet and this node was detached from its previous cluster, " +
"have discovered [" + otherNode + "]; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final DiscoveryNode yetAnotherNode = new DiscoveryNode("yetAnotherNode", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered yet and this node was detached from its previous cluster, " +
"have discovered [" + yetAnotherNode + "]; " +
"discovery will continue using [] from hosts providers and [" + localNode +
@ -277,109 +310,110 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
final ClusterState clusterState = state(localNode, "otherNode");
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [otherNode], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final TransportAddress otherAddress = buildNewFakeTransportAddress();
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, singletonList(otherAddress), emptyList(), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [otherNode], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [" + otherAddress + "] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(otherNode), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [otherNode], " +
"have discovered [" + otherNode + "] which is a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
final DiscoveryNode yetAnotherNode = new DiscoveryNode("yetAnotherNode", buildNewFakeTransportAddress(), Version.CURRENT);
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), singletonList(yetAnotherNode), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [otherNode], " +
"have discovered [" + yetAnotherNode + "] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2"), emptyList(), emptyList(), 0L, electionStrategy)
.getDescription(),
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2"), emptyList(), emptyList(), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires two nodes with ids [n1, n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3"), emptyList(), emptyList(), 0L,
electionStrategy).getDescription(),
electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires at least 2 nodes with ids from [n1, n2, n3], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", BOOTSTRAP_PLACEHOLDER_PREFIX + "n3"),
emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
emptyList(), emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires 2 nodes with ids [n1, n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4"), emptyList(), emptyList(), 0L,
electionStrategy).getDescription(),
electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", "n5"), emptyList(), emptyList(), 0L,
electionStrategy).getDescription(),
electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4, n5], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"),
emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
emptyList(), emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3",
BOOTSTRAP_PLACEHOLDER_PREFIX + "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L, electionStrategy)
.getDescription(),
BOOTSTRAP_PLACEHOLDER_PREFIX + "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires 3 nodes with ids [n1, n2, n3], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n1"}), emptyList(),
emptyList(), 0L, electionStrategy).getDescription(),
emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [n1], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2"}), emptyList(),
emptyList(), 0L, electionStrategy).getDescription(),
emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [n1] and a node with id [n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3"}), emptyList(),
emptyList(), 0L, electionStrategy).getDescription(),
emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [n1] and two nodes with ids [n2, n3], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n2", "n3", "n4"}),
emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
emptyList(), emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [n1] and " +
"at least 2 nodes with ids from [n2, n3, n4], " +
"have discovered [] which is not a quorum; " +
@ -400,7 +434,8 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
.lastCommittedConfiguration(config(configNodeIds)).build())).build();
assertThat(
new ClusterFormationState(Settings.EMPTY, stateWithOtherNodes, emptyList(), emptyList(), 0L, electionStrategy).getDescription(),
new ClusterFormationState(Settings.EMPTY, stateWithOtherNodes, emptyList(), emptyList(), 0L, electionStrategy,
new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
// nodes from last-known cluster state could be in either order
is(oneOf(
@ -415,7 +450,7 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0")));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, GatewayMetaState.STALE_STATE_CONFIG_NODE_ID), emptyList(),
emptyList(), 0L, electionStrategy).getDescription(),
emptyList(), 0L, electionStrategy, new StatusInfo(HEALTHY, "healthy-info")).getDescription(),
is("master not discovered or elected yet, an election requires one or more nodes that have already participated as " +
"master-eligible nodes in the cluster but this node was not master-eligible the last time it joined the cluster, " +
"have discovered [] which is not a quorum; " +

View File

@ -45,6 +45,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.MockLogAppender;
import java.io.IOException;
@ -54,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -75,8 +77,11 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MAS
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.elasticsearch.test.NodeRoles.nonMasterNode;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -160,6 +165,77 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
}
}
public void testUnhealthyNodesGetsRemoved() {
AtomicReference<StatusInfo> healthStatusInfo = new AtomicReference<>(
new StatusInfo(HEALTHY, "healthy-info"));
try (Cluster cluster = new Cluster(3)) {
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
logger.info("--> adding two new healthy nodes");
ClusterNode newNode1 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings,
() -> healthStatusInfo.get());
ClusterNode newNode2 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings,
() -> healthStatusInfo.get());
cluster.clusterNodes.add(newNode1);
cluster.clusterNodes.add(newNode2);
cluster.stabilise(
// The first pinging discovers the master
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING)
// One message delay to send a join
+ DEFAULT_DELAY_VARIABILITY
// Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a
// followup reconfiguration
+ 2 * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
{
assertThat(leader.coordinator.getMode(), is(Mode.LEADER));
final VotingConfiguration lastCommittedConfiguration = leader.getLastAppliedClusterState().getLastCommittedConfiguration();
assertThat(lastCommittedConfiguration + " should be all nodes", lastCommittedConfiguration.getNodeIds(),
equalTo(cluster.clusterNodes.stream().map(ClusterNode::getId).collect(Collectors.toSet())));
}
logger.info("setting auto-shrink reconfiguration to true");
leader.submitSetAutoShrinkVotingConfiguration(true);
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
assertTrue(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.get(leader.getLastAppliedClusterState().metadata().settings()));
logger.info("--> changing health of newly added nodes to unhealthy");
healthStatusInfo.getAndSet(new StatusInfo(UNHEALTHY, "unhealthy-info"));
cluster.stabilise(Math.max(
// Each follower may have just sent a leader check, which receives no response
defaultMillis(LEADER_CHECK_TIMEOUT_SETTING)
// then wait for the follower to check the leader
+ defaultMillis(LEADER_CHECK_INTERVAL_SETTING)
// then wait for the exception response
+ DEFAULT_DELAY_VARIABILITY,
// ALSO the leader may have just sent a follower check, which receives no response
defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)
// wait for the leader to check its followers
+ defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING)
// then wait for the exception response
+ DEFAULT_DELAY_VARIABILITY)
// FINALLY:
// wait for the removal to be committed
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
// then wait for the followup reconfiguration
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
{
final ClusterNode newLeader = cluster.getAnyLeader();
final VotingConfiguration lastCommittedConfiguration
= newLeader.getLastAppliedClusterState().getLastCommittedConfiguration();
assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3));
assertFalse(lastCommittedConfiguration.getNodeIds().contains(newNode1.getId()));
assertFalse(lastCommittedConfiguration.getNodeIds().contains(newNode2.getId()));
}
}
}
public void testNodesJoinAfterStableCluster() {
try (Cluster cluster = new Cluster(randomIntBetween(1, 5))) {
cluster.runRandomly();
@ -489,6 +565,66 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
}
}
public void testUnHealthyLeaderRemoved() {
AtomicReference<StatusInfo> nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(HEALTHY, "healthy-info"));
try (Cluster cluster = new Cluster(randomIntBetween(1, 3), true, Settings.EMPTY,
() -> nodeHealthServiceStatus.get())) {
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
logger.info("--> adding three new healthy nodes");
ClusterNode newNode1 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings,
() -> new StatusInfo(HEALTHY, "healthy-info"));
ClusterNode newNode2 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings,
() -> new StatusInfo(HEALTHY, "healthy-info"));
ClusterNode newNode3 = cluster.new ClusterNode(nextNodeIndex.getAndIncrement(), true, leader.nodeSettings,
() -> new StatusInfo(HEALTHY, "healthy-info"));
cluster.clusterNodes.add(newNode1);
cluster.clusterNodes.add(newNode2);
cluster.clusterNodes.add(newNode3);
cluster.stabilise(
// The first pinging discovers the master
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING)
// One message delay to send a join
+ DEFAULT_DELAY_VARIABILITY
// Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a
// followup reconfiguration
+ 3 * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
logger.info("--> changing health status of leader {} to unhealthy", leader);
nodeHealthServiceStatus.getAndSet(new StatusInfo(UNHEALTHY, "unhealthy-info"));
cluster.stabilise(
// first wait for all the followers to notice the leader has gone
(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING))
// then wait for a follower to be promoted to leader
+ DEFAULT_ELECTION_DELAY
// and the first publication times out because of the unresponsive node
+ defaultMillis(PUBLISH_TIMEOUT_SETTING)
// there might be a term bump causing another election
+ DEFAULT_ELECTION_DELAY
// then wait for both of:
+ Math.max(
// 1. the term bumping publication to time out
defaultMillis(PUBLISH_TIMEOUT_SETTING),
// 2. the new leader to notice that the old leader is unresponsive
(defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
)
// then wait for the new leader to commit a state without the old leader
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
// then wait for the followup reconfiguration
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
);
assertThat(cluster.getAnyLeader().getId(), anyOf(equalTo(newNode1.getId()), equalTo(newNode2.getId()),
equalTo(newNode3.getId())));
}
}
public void testFollowerDisconnectionDetectedQuickly() {
try (Cluster cluster = new Cluster(randomIntBetween(3, 5))) {
cluster.runRandomly();
@ -1024,7 +1160,8 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
final ClusterNode newNode = cluster1.new ClusterNode(nextNodeIndex.getAndIncrement(),
nodeInOtherCluster.getLocalNode(), n -> cluster1.new MockPersistedState(n, nodeInOtherCluster.persistedState,
Function.identity(), Function.identity()), nodeInOtherCluster.nodeSettings);
Function.identity(), Function.identity()), nodeInOtherCluster.nodeSettings,
() -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"));
cluster1.clusterNodes.add(newNode);

View File

@ -29,6 +29,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction;
@ -63,6 +65,8 @@ import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_C
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.hamcrest.Matchers.contains;
@ -109,7 +113,7 @@ public class FollowersCheckerTests extends ESTestCase {
assert false : fcr;
}, (node, reason) -> {
assert false : node;
});
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"));
followersChecker.setCurrentNodes(discoveryNodesHolder[0]);
deterministicTaskQueue.runAllTasks();
@ -179,7 +183,8 @@ public class FollowersCheckerTests extends ESTestCase {
testBehaviourOfFailingNode(settings, () -> null,
"followers check retry count exceeded",
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis()
+ FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis());
+ FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis(),
() -> new StatusInfo(HEALTHY, "healthy-info"));
}
public void testFailsNodeThatRejectsCheck() {
@ -196,7 +201,8 @@ public class FollowersCheckerTests extends ESTestCase {
throw new ElasticsearchException("simulated exception");
},
"followers check retry count exceeded",
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis());
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis(),
() -> new StatusInfo(HEALTHY, "healthy-info"));
}
public void testFailureCounterResetsOnSuccess() {
@ -229,13 +235,13 @@ public class FollowersCheckerTests extends ESTestCase {
},
"followers check retry count exceeded",
(FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * (maxRecoveries + 1) - 1)
* FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis());
* FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis(), () -> new StatusInfo(HEALTHY, "healthy-info"));
}
public void testFailsNodeThatIsDisconnected() {
testBehaviourOfFailingNode(Settings.EMPTY, () -> {
throw new ConnectTransportException(null, "simulated exception");
}, "disconnected", 0);
}, "disconnected", 0, () -> new StatusInfo(HEALTHY, "healthy-info"));
}
public void testFailsNodeThatDisconnects() {
@ -278,7 +284,7 @@ public class FollowersCheckerTests extends ESTestCase {
}, (node, reason) -> {
assertTrue(nodeFailed.compareAndSet(false, true));
assertThat(reason, equalTo("disconnected"));
});
}, () -> new StatusInfo(HEALTHY, "healthy-info"));
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
followersChecker.setCurrentNodes(discoveryNodes);
@ -290,8 +296,23 @@ public class FollowersCheckerTests extends ESTestCase {
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));
}
public void testFailsNodeThatIsUnhealthy() {
final Builder settingsBuilder = Settings.builder();
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), randomIntBetween(1, 10));
}
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), randomIntBetween(100, 100000) + "ms");
}
final Settings settings = settingsBuilder.build();
testBehaviourOfFailingNode(settings, () -> {
throw new NodeHealthCheckFailureException("non writable exception");
}, "health check failed", 0, () -> new StatusInfo(HEALTHY, "healthy-info"));
}
private void testBehaviourOfFailingNode(Settings testSettings, Supplier<TransportResponse.Empty> responder, String failureReason,
long expectedFailureTime) {
long expectedFailureTime, NodeHealthService nodeHealthService) {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build();
@ -339,7 +360,7 @@ public class FollowersCheckerTests extends ESTestCase {
}, (node, reason) -> {
assertTrue(nodeFailed.compareAndSet(false, true));
assertThat(reason, equalTo(failureReason));
});
}, nodeHealthService);
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
followersChecker.setCurrentNodes(discoveryNodes);
@ -401,6 +422,71 @@ public class FollowersCheckerTests extends ESTestCase {
});
}
public void testUnhealthyNodeRejectsImmediately(){
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
throw new AssertionError("no requests expected");
}
};
final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> follower, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final AtomicBoolean calledCoordinator = new AtomicBoolean();
final AtomicReference<RuntimeException> coordinatorException = new AtomicReference<>();
final FollowersChecker followersChecker = new FollowersChecker(settings, transportService,
fcr -> {
assertTrue(calledCoordinator.compareAndSet(false, true));
final RuntimeException exception = coordinatorException.get();
if (exception != null) {
throw exception;
}
}, (node, reason) -> {
assert false : node;
}, () -> new StatusInfo(UNHEALTHY, "unhealthy-info"));
final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE);
final long followerTerm = randomLongBetween(1, leaderTerm - 1);
followersChecker.updateFastResponseState(followerTerm, Mode.FOLLOWER);
final AtomicReference<TransportException> receivedException = new AtomicReference<>();
transportService.sendRequest(follower, FOLLOWER_CHECK_ACTION_NAME, new FollowerCheckRequest(leaderTerm, leader),
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty read(StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse.Empty response) {
fail("unexpected success");
}
@Override
public void handleException(TransportException exp) {
assertThat(exp, not(nullValue()));
assertTrue(receivedException.compareAndSet(null, exp));
}
@Override
public String executor() {
return Names.SAME;
}
});
deterministicTaskQueue.runAllTasks();
assertFalse(calledCoordinator.get());
assertThat(receivedException.get(), not(nullValue()));
}
public void testResponder() {
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT);
@ -431,7 +517,7 @@ public class FollowersCheckerTests extends ESTestCase {
}
}, (node, reason) -> {
assert false : node;
});
}, () -> new StatusInfo(HEALTHY, "healthy-info"));
{
// Does not call into the coordinator in the normal case
@ -560,7 +646,7 @@ public class FollowersCheckerTests extends ESTestCase {
assert false : fcr;
}, (node, reason) -> {
assert false : node;
});
},() -> new StatusInfo(HEALTHY, "healthy-info"));
followersChecker.setCurrentNodes(discoveryNodes);
List<DiscoveryNode> followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear())
.map(cr -> cr.node).collect(Collectors.toList());

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.MembershipAction;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
@ -40,7 +41,10 @@ import org.elasticsearch.transport.TransportService;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -58,7 +62,8 @@ public class JoinHelperTests extends ESTestCase {
x -> localNode, null, Collections.emptySet());
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, p, r) -> {});
Collections.emptyList(), (s, p, r) -> {},
() -> new StatusInfo(HEALTHY, "info"));
transportService.start();
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
@ -164,7 +169,7 @@ public class JoinHelperTests extends ESTestCase {
x -> localNode, null, Collections.emptySet());
new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, p, r) -> {}); // registers request handler
Collections.emptyList(), (s, p, r) -> {}, null); // registers request handler
transportService.start();
transportService.acceptIncomingRequests();
@ -183,4 +188,54 @@ public class JoinHelperTests extends ESTestCase {
assertThat(coordinationStateRejectedException.getMessage(), containsString(localClusterState.metadata().clusterUUID()));
assertThat(coordinationStateRejectedException.getMessage(), containsString(otherClusterState.metadata().clusterUUID()));
}
public void testJoinFailureOnUnhealthyNodes() {
DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
Settings.builder().put(NODE_NAME_SETTING.getKey(), "node0").build(), random());
CapturingTransport capturingTransport = new CapturingTransport();
DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT);
TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY,
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> localNode, null, Collections.emptySet());
AtomicReference<StatusInfo> nodeHealthServiceStatus = new AtomicReference<>
(new StatusInfo(UNHEALTHY, "unhealthy-info"));
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, p, r) -> {}, () -> nodeHealthServiceStatus.get());
transportService.start();
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT);
assertFalse(joinHelper.isJoinPending());
// check that sending a join to node1 doesn't work
Optional<Join> optionalJoin1 = randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
joinHelper.sendJoinRequest(node1, randomNonNegativeLong(), optionalJoin1);
CapturedRequest[] capturedRequests1 = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests1.length, equalTo(0));
assertFalse(joinHelper.isJoinPending());
// check that sending a join to node2 doesn't work
Optional<Join> optionalJoin2 = randomBoolean() ? Optional.empty() :
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
transportService.start();
joinHelper.sendJoinRequest(node2, randomNonNegativeLong(), optionalJoin2);
CapturedRequest[] capturedRequests2 = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests2.length, equalTo(0));
assertFalse(joinHelper.isJoinPending());
nodeHealthServiceStatus.getAndSet(new StatusInfo(HEALTHY, "healthy-info"));
// check that sending another join to node1 now works again
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
CapturedRequest[] capturedRequests1a = capturingTransport.getCapturedRequestsAndClear();
assertThat(capturedRequests1a.length, equalTo(1));
CapturedRequest capturedRequest1a = capturedRequests1a[0];
assertEquals(node1, capturedRequest1a.node);
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction;
@ -43,12 +44,15 @@ import org.elasticsearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
@ -152,7 +156,7 @@ public class LeaderCheckerTests extends ESTestCase {
e -> {
assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks"));
assertTrue(leaderFailed.compareAndSet(false, true));
});
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"));
logger.info("--> creating first checker");
leaderChecker.updateLeader(leader1);
@ -257,7 +261,7 @@ public class LeaderCheckerTests extends ESTestCase {
e -> {
assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check")));
assertTrue(leaderFailed.compareAndSet(false, true));
});
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"));
leaderChecker.updateLeader(leader);
{
@ -314,23 +318,112 @@ public class LeaderCheckerTests extends ESTestCase {
}
}
public void testFollowerFailsImmediatelyOnHealthCheckFailure() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final Response[] responseHolder = new Response[]{Response.SUCCESS};
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
return;
}
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
assertEquals(node, leader);
final Response response = responseHolder[0];
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
switch (response) {
case SUCCESS:
handleResponse(requestId, Empty.INSTANCE);
break;
case REMOTE_ERROR:
handleRemoteError(requestId, new NodeHealthCheckFailureException("simulated error"));
break;
}
}
@Override
public String toString() {
return response + " response to request " + requestId;
}
});
}
};
final TransportService transportService = mockTransport.createTransportService(settings,
deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final AtomicBoolean leaderFailed = new AtomicBoolean();
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService,
e -> {
assertThat(e.getMessage(), endsWith("failed health checks"));
assertTrue(leaderFailed.compareAndSet(false, true));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"));
leaderChecker.updateLeader(leader);
{
while (deterministicTaskQueue.getCurrentTimeMillis() < 10 * LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis()) {
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(leaderFailed.get());
responseHolder[0] = Response.REMOTE_ERROR;
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(leaderFailed.get());
}
}
public void testLeaderBehaviour() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getId()).build();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final CapturingTransport capturingTransport = new CapturingTransport();
AtomicReference<StatusInfo> nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(UNHEALTHY, "unhealthy-info"));
final TransportService transportService = capturingTransport.createTransportService(settings,
deterministicTaskQueue.getThreadPool(), NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> fail("shouldn't be checking anything"));
final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> fail("shouldn't be checking anything"),
() -> nodeHealthServiceStatus.get());
final DiscoveryNodes discoveryNodes
= DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build();
{
leaderChecker.setCurrentNodes(discoveryNodes);
final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
deterministicTaskQueue.runAllTasks();
assertFalse(handler.successfulResponseReceived);
assertThat(handler.transportException.getRootCause(), instanceOf(NodeHealthCheckFailureException.class));
NodeHealthCheckFailureException cause = (NodeHealthCheckFailureException) handler.transportException.getRootCause();
assertThat(cause.getMessage(), equalTo("rejecting leader check from [" + otherNode
+ "] since node is unhealthy [unhealthy-info]"));
}
nodeHealthServiceStatus.getAndSet(new StatusInfo(HEALTHY, "healthy-info"));
{
leaderChecker.setCurrentNodes(discoveryNodes);

View File

@ -38,6 +38,8 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
@ -73,6 +75,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -121,13 +124,13 @@ public class NodeJoinTests extends ESTestCase {
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
}
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState, NodeHealthService nodeHealthService) {
deterministicTaskQueue
= new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool();
FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test",
fakeThreadPool, deterministicTaskQueue::scheduleNow);
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get());
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get(), nodeHealthService);
fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
coordinator.handlePublishRequest(new PublishRequest(event.state()));
publishListener.onResponse(null);
@ -143,13 +146,14 @@ public class NodeJoinTests extends ESTestCase {
clusterStateRef.set(event.state());
publishListener.onResponse(null);
});
setupMasterServiceAndCoordinator(term, initialState, masterService, threadPool, new Random(Randomness.get().nextLong()));
setupMasterServiceAndCoordinator(term, initialState, masterService, threadPool, new Random(Randomness.get().nextLong()),
() -> new StatusInfo(HEALTHY, "healthy-info"));
masterService.setClusterStateSupplier(clusterStateRef::get);
masterService.start();
}
private void setupMasterServiceAndCoordinator(long term, ClusterState initialState, MasterService masterService,
ThreadPool threadPool, Random random) {
ThreadPool threadPool, Random random, NodeHealthService nodeHealthService) {
if (this.masterService != null || coordinator != null) {
throw new IllegalStateException("method setupMasterServiceAndCoordinator can only be called once");
}
@ -179,7 +183,7 @@ public class NodeJoinTests extends ESTestCase {
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
Collections.emptyList(),
random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE);
random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE, nodeHealthService);
transportService.start();
transportService.acceptIncomingRequests();
transport = capturingTransport;
@ -270,7 +274,8 @@ public class NodeJoinTests extends ESTestCase {
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
VotingConfiguration.of(randomFrom(node0, node1))));
VotingConfiguration.of(randomFrom(node0, node1))),
() -> new StatusInfo(HEALTHY, "healthy-info"));
assertFalse(isLocalNodeElectedMaster());
assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId());
long newTerm = initialTerm + randomLongBetween(1, 10);
@ -290,7 +295,7 @@ public class NodeJoinTests extends ESTestCase {
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
VotingConfiguration.of(node1)));
VotingConfiguration.of(node1)), () -> new StatusInfo(HEALTHY, "healthy-info"));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
long higherVersion = initialVersion + randomLongBetween(1, 10);
@ -306,7 +311,7 @@ public class NodeJoinTests extends ESTestCase {
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
VotingConfiguration.of(node0)));
VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info"));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
long higherVersion = initialVersion + randomLongBetween(1, 10);
@ -320,7 +325,7 @@ public class NodeJoinTests extends ESTestCase {
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
VotingConfiguration.of(node0)));
VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info"));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
@ -337,7 +342,7 @@ public class NodeJoinTests extends ESTestCase {
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
VotingConfiguration.of(node0)));
VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info"));
long newTerm = initialTerm + randomLongBetween(1, 10);
joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));
@ -356,7 +361,7 @@ public class NodeJoinTests extends ESTestCase {
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
VotingConfiguration.of(node2)));
VotingConfiguration.of(node2)), () -> new StatusInfo(HEALTHY, "healthy-info"));
assertFalse(isLocalNodeElectedMaster());
long newTerm = initialTerm + randomLongBetween(1, 10);
SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, newTerm, Optional.of(
@ -383,7 +388,7 @@ public class NodeJoinTests extends ESTestCase {
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
VotingConfiguration.of(node0)));
VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info"));
long newTerm = initialTerm + randomLongBetween(1, 10);
handleStartJoinFrom(node1, newTerm);
handleFollowerCheckFrom(node1, newTerm);
@ -402,7 +407,8 @@ public class NodeJoinTests extends ESTestCase {
CoordinationMetadata.VotingConfigExclusion.MISSING_VALUE_MARKER, "knownNodeName");
setupFakeMasterServiceAndCoordinator(initialTerm, buildStateWithVotingConfigExclusion(initialNode, initialTerm,
initialVersion, votingConfigExclusion));
initialVersion, votingConfigExclusion),
() -> new StatusInfo(HEALTHY, "healthy-info"));
DiscoveryNode knownJoiningNode = new DiscoveryNode("knownNodeName", "newNodeId", buildNewFakeTransportAddress(),
emptyMap(), singleton(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT);
@ -478,7 +484,7 @@ public class NodeJoinTests extends ESTestCase {
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
VotingConfiguration.of(node0)));
VotingConfiguration.of(node0)), () -> new StatusInfo(HEALTHY, "healthy-info"));
long newTerm = initialTerm + randomLongBetween(1, 10);
handleStartJoinFrom(node1, newTerm);
handleFollowerCheckFrom(node1, newTerm);
@ -494,7 +500,7 @@ public class NodeJoinTests extends ESTestCase {
long initialTerm = randomLongBetween(1, 10);
long initialVersion = randomLongBetween(1, 10);
setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion,
VotingConfiguration.of(node1)));
VotingConfiguration.of(node1)), () -> new StatusInfo(HEALTHY, "healthy-info"));
long newTerm = initialTerm + randomLongBetween(1, 10);
SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, newTerm,
Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion))));

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.ConnectTransportException;
@ -46,6 +47,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.PreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.Names.SAME;
import static org.hamcrest.Matchers.equalTo;
@ -63,6 +66,7 @@ public class PreVoteCollectorTests extends ESTestCase {
private Map<DiscoveryNode, PreVoteResponse> responsesByNode = new HashMap<>();
private long currentTerm, lastAcceptedTerm, lastAcceptedVersion;
private TransportService transportService;
private StatusInfo healthStatus;
@Before
public void createObjects() {
@ -95,6 +99,11 @@ public class PreVoteCollectorTests extends ESTestCase {
}
});
}
@Override
public void handleRemoteError(long requestId, Throwable t) {
logger.warn("Remote error", t);
}
};
lastAcceptedTerm = randomNonNegativeLong();
currentTerm = randomLongBetween(lastAcceptedTerm, Long.MAX_VALUE);
@ -102,6 +111,7 @@ public class PreVoteCollectorTests extends ESTestCase {
localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(localNode, new PreVoteResponse(currentTerm, lastAcceptedTerm, lastAcceptedVersion));
healthStatus = new StatusInfo(HEALTHY, "healthy-info");
transportService = mockTransport.createTransportService(settings,
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundTransportAddress -> localNode, null, emptySet());
@ -112,7 +122,7 @@ public class PreVoteCollectorTests extends ESTestCase {
assert electionOccurred == false;
electionOccurred = true;
}, l -> {
}, ElectionStrategy.DEFAULT_INSTANCE);
}, ElectionStrategy.DEFAULT_INSTANCE, () -> healthStatus);
preVoteCollector.update(getLocalPreVoteResponse(), null);
}
@ -147,6 +157,13 @@ public class PreVoteCollectorTests extends ESTestCase {
assertTrue(electionOccurred);
}
public void testNoElectionStartIfLocalNodeIsOnlyNodeAndUnhealthy() {
healthStatus = new StatusInfo(UNHEALTHY, "unhealthy-info");
preVoteCollector.update(getLocalPreVoteResponse(), null);
startAndRunCollector(localNode);
assertFalse(electionOccurred);
}
public void testStartsElectionIfLocalNodeIsQuorum() {
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode, getLocalPreVoteResponse());
@ -169,6 +186,15 @@ public class PreVoteCollectorTests extends ESTestCase {
assertFalse(electionOccurred);
}
public void testUnhealthyNodeDoesNotOfferPreVote() {
final long term = randomNonNegativeLong();
healthStatus = new StatusInfo(UNHEALTHY, "unhealthy-info");
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
RemoteTransportException remoteTransportException = expectThrows(RemoteTransportException.class, () ->
handlePreVoteRequestViaTransportService(new PreVoteRequest(otherNode, term)));
assertThat(remoteTransportException.getCause(), instanceOf(NodeHealthCheckFailureException.class));
}
public void testDoesNotStartElectionIfStopped() {
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
responsesByNode.put(otherNode, getLocalPreVoteResponse());

View File

@ -92,7 +92,7 @@ public class DiscoveryModuleTests extends ESTestCase {
private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugins) {
return new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, null, masterService,
clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState,
mock(RerouteService.class));
mock(RerouteService.class), null);
}
public void testDefaults() {

View File

@ -0,0 +1,345 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.monitor.fs;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.mockfile.FilterFileChannel;
import org.apache.lucene.mockfile.FilterFileSystemProvider;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.PathUtilsForTesting;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystem;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.is;
public class FsHealthServiceTests extends ESTestCase {
private DeterministicTaskQueue deterministicTaskQueue;
@Before
public void createObjects() {
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
}
public void testSchedulesHealthCheckAtRefreshIntervals() throws Exception {
long refreshInterval = randomLongBetween(1000, 12000);
final Settings settings = Settings.builder().put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms").build();
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, deterministicTaskQueue.getThreadPool(), env);
final long startTimeMillis = deterministicTaskQueue.getCurrentTimeMillis();
fsHealthService.doStart();
assertFalse(deterministicTaskQueue.hasRunnableTasks());
assertTrue(deterministicTaskQueue.hasDeferredTasks());
int rescheduledCount = 0;
for (int i = 1; i <= randomIntBetween(5, 10); i++) {
if (deterministicTaskQueue.hasRunnableTasks()) {
deterministicTaskQueue.runRandomTask();
} else {
assertThat(deterministicTaskQueue.getLatestDeferredExecutionTime(), is(refreshInterval * (rescheduledCount + 1)));
deterministicTaskQueue.advanceTime();
rescheduledCount++;
}
assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, is(refreshInterval * rescheduledCount));
}
fsHealthService.doStop();
deterministicTaskQueue.runAllTasksInTimeOrder();
assertFalse(deterministicTaskQueue.hasRunnableTasks());
assertFalse(deterministicTaskQueue.hasDeferredTasks());
}
}
public void testFailsHealthOnIOException() throws IOException {
FileSystem fileSystem = PathUtils.getDefaultFileSystem();
FileSystemIOExceptionProvider disruptFileSystemProvider = new FileSystemIOExceptionProvider(fileSystem);
fileSystem = disruptFileSystemProvider.getFileSystem(null);
PathUtilsForTesting.installMock(fileSystem);
final Settings settings = Settings.EMPTY;
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());
//disrupt file system
disruptFileSystemProvider.injectIOException.set(true);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
for (Path path : env.nodeDataPaths()) {
assertTrue(fsHealthService.getHealth().getInfo().contains(path.toString()));
}
assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount());
} finally {
disruptFileSystemProvider.injectIOException.set(false);
PathUtilsForTesting.teardown();
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}
@TestLogging(value = "org.elasticsearch.monitor.fs:WARN", reason = "to ensure that we log on hung IO at WARN level")
public void testLoggingOnHungIO() throws Exception {
long slowLogThreshold = randomLongBetween(100, 200);
final Settings settings = Settings.builder().put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(),
slowLogThreshold + "ms").build();
FileSystem fileSystem = PathUtils.getDefaultFileSystem();
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem,
randomLongBetween(slowLogThreshold + 1 , 400), testThreadPool);
fileSystem = disruptFileSystemProvider.getFileSystem(null);
PathUtilsForTesting.installMock(fileSystem);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
Logger logger = LogManager.getLogger(FsHealthService.class);
Loggers.addAppender(logger, mockAppender);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
int counter = 0;
for(Path path : env.nodeDataPaths()){
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test" + ++counter,
FsHealthService.class.getCanonicalName(),
Level.WARN,
"health check of [" + path + "] took [*ms] which is above the warn threshold*"));
}
//disrupt file system
disruptFileSystemProvider.injectIOException.set(true);
fsHealthService.new FsHealthMonitor().run();
assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount());
assertBusy(mockAppender::assertAllExpectationsMatched);
} finally {
Loggers.removeAppender(logger, mockAppender);
mockAppender.stop();
PathUtilsForTesting.teardown();
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}
public void testFailsHealthOnSinglePathFsyncFailure() throws IOException {
FileSystem fileSystem = PathUtils.getDefaultFileSystem();
FileSystemFsyncIOExceptionProvider disruptFsyncFileSystemProvider = new FileSystemFsyncIOExceptionProvider(fileSystem);
fileSystem = disruptFsyncFileSystemProvider.getFileSystem(null);
PathUtilsForTesting.installMock(fileSystem);
final Settings settings = Settings.EMPTY;
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
try (NodeEnvironment env = newNodeEnvironment()) {
Path[] paths = env.nodeDataPaths();
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());
//disrupt file system fsync on single path
disruptFsyncFileSystemProvider.injectIOException.set(true);
String disruptedPath = randomFrom(paths).toString();
disruptFsyncFileSystemProvider.restrictPathPrefix(disruptedPath);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]"));
assertEquals(1, disruptFsyncFileSystemProvider.getInjectedPathCount());
} finally {
disruptFsyncFileSystemProvider.injectIOException.set(false);
PathUtilsForTesting.teardown();
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}
public void testFailsHealthOnSinglePathWriteFailure() throws IOException {
FileSystem fileSystem = PathUtils.getDefaultFileSystem();
FileSystemIOExceptionProvider disruptWritesFileSystemProvider = new FileSystemIOExceptionProvider(fileSystem);
fileSystem = disruptWritesFileSystemProvider.getFileSystem(null);
PathUtilsForTesting.installMock(fileSystem);
final Settings settings = Settings.EMPTY;
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
try (NodeEnvironment env = newNodeEnvironment()) {
Path[] paths = env.nodeDataPaths();
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());
//disrupt file system writes on single path
disruptWritesFileSystemProvider.injectIOException.set(true);
String disruptedPath = randomFrom(paths).toString();
disruptWritesFileSystemProvider.restrictPathPrefix(disruptedPath);
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]"));
assertEquals(1, disruptWritesFileSystemProvider.getInjectedPathCount());
} finally {
disruptWritesFileSystemProvider.injectIOException.set(false);
PathUtilsForTesting.teardown();
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}
private static class FileSystemIOExceptionProvider extends FilterFileSystemProvider {
AtomicBoolean injectIOException = new AtomicBoolean();
AtomicInteger injectedPaths = new AtomicInteger();
private String pathPrefix = "/";
FileSystemIOExceptionProvider(FileSystem inner) {
super("disrupt_fs_health://", inner);
}
public void restrictPathPrefix(String pathPrefix){
this.pathPrefix = pathPrefix;
}
public int getInjectedPathCount(){
return injectedPaths.get();
}
@Override
public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException {
if (injectIOException.get()){
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
injectedPaths.incrementAndGet();
throw new IOException("fake IOException");
}
}
return super.newOutputStream(path, options);
}
}
private static class FileSystemFsyncIOExceptionProvider extends FilterFileSystemProvider {
AtomicBoolean injectIOException = new AtomicBoolean();
AtomicInteger injectedPaths = new AtomicInteger();
private String pathPrefix = "/";
FileSystemFsyncIOExceptionProvider(FileSystem inner) {
super("disrupt_fs_health://", inner);
}
public void restrictPathPrefix(String pathPrefix){
this.pathPrefix = pathPrefix;
}
public int getInjectedPathCount(){
return injectedPaths.get();
}
@Override
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
@Override
public void force(boolean metaData) throws IOException {
if (injectIOException.get()) {
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
injectedPaths.incrementAndGet();
throw new IOException("fake IOException");
}
}
super.force(metaData);
}
};
}
}
private static class FileSystemFsyncHungProvider extends FilterFileSystemProvider {
AtomicBoolean injectIOException = new AtomicBoolean();
AtomicInteger injectedPaths = new AtomicInteger();
private String pathPrefix = "/";
private long delay;
private final ThreadPool threadPool;
FileSystemFsyncHungProvider(FileSystem inner, long delay, ThreadPool threadPool) {
super("disrupt_fs_health://", inner);
this.delay = delay;
this.threadPool = threadPool;
}
public int getInjectedPathCount(){
return injectedPaths.get();
}
@Override
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
@Override
public void force(boolean metaData) throws IOException {
if (injectIOException.get()) {
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
injectedPaths.incrementAndGet();
final long startTimeMillis = threadPool.relativeTimeInMillis();
do {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
} while (threadPool.relativeTimeInMillis() <= startTimeMillis + delay);
}
}
super.force(metaData);
}
};
}
}
}

View File

@ -168,6 +168,7 @@ import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
@ -217,6 +218,7 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@ -1649,7 +1651,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
hostsResolver -> nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(n -> n.node.getAddress()).collect(Collectors.toList()),
clusterService.getClusterApplierService(), Collections.emptyList(), random(),
new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE);
new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE,
() -> new StatusInfo(HEALTHY, "healthy-info"));
masterService.setClusterStatePublisher(coordinator);
coordinator.start();
clusterService.getClusterApplierService().setNodeConnectionsService(nodeConnectionsService);

View File

@ -65,6 +65,8 @@ import org.elasticsearch.gateway.ClusterStateUpdaters;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MockGatewayMetaState;
import org.elasticsearch.gateway.PersistedClusterStateService;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
@ -125,6 +127,7 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MAS
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.elasticsearch.transport.TransportSettings.CONNECT_TIMEOUT;
@ -254,6 +257,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker();
private final History history = new History();
private NodeHealthService nodeHealthService;
private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier = MockPersistedState::new;
@ -265,6 +269,11 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
}
Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings) {
this(initialNodeCount, allNodesMasterEligible, nodeSettings, () -> new StatusInfo(HEALTHY, "healthy-info"));
}
Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
this.nodeHealthService = nodeHealthService;
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
assertThat(initialNodeCount, greaterThan(0));
@ -273,7 +282,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
clusterNodes = new ArrayList<>(initialNodeCount);
for (int i = 0; i < initialNodeCount; i++) {
final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(),
allNodesMasterEligible || i == 0 || randomBoolean(), nodeSettings);
allNodesMasterEligible || i == 0 || randomBoolean(), nodeSettings, nodeHealthService);
clusterNodes.add(clusterNode);
if (clusterNode.getLocalNode().isMasterNode()) {
masterEligibleNodeIds.add(clusterNode.getId());
@ -309,7 +318,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
final List<ClusterNode> addedNodes = new ArrayList<>();
for (int i = 0; i < newNodesCount; i++) {
final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true, Settings.EMPTY);
final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true, Settings.EMPTY,
nodeHealthService);
addedNodes.add(clusterNode);
}
clusterNodes.addAll(addedNodes);
@ -635,7 +645,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) {
return n1 == n2 ||
(getConnectionStatus(n1.getLocalNode(), n2.getLocalNode()) == ConnectionStatus.CONNECTED
&& getConnectionStatus(n2.getLocalNode(), n1.getLocalNode()) == ConnectionStatus.CONNECTED);
&& getConnectionStatus(n2.getLocalNode(), n1.getLocalNode()) == ConnectionStatus.CONNECTED) &&
(n1.nodeHealthService.getHealth().getStatus() == HEALTHY && n2.nodeHealthService.getHealth().getStatus() == HEALTHY);
}
ClusterNode getAnyLeader() {
@ -881,14 +892,18 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
private ClusterService clusterService;
TransportService transportService;
private DisruptableMockTransport mockTransport;
private NodeHealthService nodeHealthService;
List<BiConsumer<DiscoveryNode, ClusterState>> extraJoinValidators = new ArrayList<>();
ClusterNode(int nodeIndex, boolean masterEligible, Settings nodeSettings) {
this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier, nodeSettings);
ClusterNode(int nodeIndex, boolean masterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier, nodeSettings,
nodeHealthService);
}
ClusterNode(int nodeIndex, DiscoveryNode localNode, Function<DiscoveryNode, MockPersistedState> persistedStateSupplier,
Settings nodeSettings) {
Settings nodeSettings, NodeHealthService nodeHealthService) {
this.nodeHealthService = nodeHealthService;
this.nodeIndex = nodeIndex;
this.localNode = localNode;
this.nodeSettings = nodeSettings;
@ -944,7 +959,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
allocationService, masterService, this::getPersistedState,
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {},
getElectionStrategy());
getElectionStrategy(), nodeHealthService);
masterService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService
= new GatewayService(settings, allocationService, clusterService, threadPool, null, coordinator);
@ -984,7 +999,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
localNode.isMasterNode() && DiscoveryNode.isMasterNode(nodeSettings)
? DiscoveryNodeRole.BUILT_IN_ROLES : emptySet(), Version.CURRENT);
return new ClusterNode(nodeIndex, newLocalNode,
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm), nodeSettings);
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm), nodeSettings,
nodeHealthService);
}
private CoordinationState.PersistedState getPersistedState() {