Quieter logging from the DiskThresholdMonitor (#48115)

Today if an Elasticsearch node reaches a disk watermark then it will repeatedly
emit logging about it, which implies that some action needs to be taken by the
administrator. This is misleading. Elasticsearch strives to keep nodes under
the high watermark, but it is normal to have a few nodes occasionally exceed
this level. Nodes may be over the low watermark for an extended period without
any ill effects.

This commit enhances the logging emitted by the `DiskThresholdMonitor` to be
less misleading. The expected case of hitting the high watermark and
immediately relocating one or more shards that to bring the node back under the
watermark again is reduced in severity to `INFO`. Additionally, `INFO` messages
are not emitted repeatedly.

Fixes #48038
This commit is contained in:
David Turner 2019-10-18 15:43:55 +02:00
parent 5e4dd0fd2e
commit a8bcbbc38a
10 changed files with 454 additions and 111 deletions

View File

@ -50,7 +50,7 @@ public class BatchedRerouteService implements RerouteService {
private final Object mutex = new Object(); private final Object mutex = new Object();
@Nullable // null if no reroute is currently pending @Nullable // null if no reroute is currently pending
private List<ActionListener<Void>> pendingRerouteListeners; private List<ActionListener<ClusterState>> pendingRerouteListeners;
private Priority pendingTaskPriority = Priority.LANGUID; private Priority pendingTaskPriority = Priority.LANGUID;
/** /**
@ -65,8 +65,8 @@ public class BatchedRerouteService implements RerouteService {
* Initiates a reroute. * Initiates a reroute.
*/ */
@Override @Override
public final void reroute(String reason, Priority priority, ActionListener<Void> listener) { public final void reroute(String reason, Priority priority, ActionListener<ClusterState> listener) {
final List<ActionListener<Void>> currentListeners; final List<ActionListener<ClusterState>> currentListeners;
synchronized (mutex) { synchronized (mutex) {
if (pendingRerouteListeners != null) { if (pendingRerouteListeners != null) {
if (priority.sameOrAfter(pendingTaskPriority)) { if (priority.sameOrAfter(pendingTaskPriority)) {
@ -148,7 +148,7 @@ public class BatchedRerouteService implements RerouteService {
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
ActionListener.onResponse(currentListeners, null); ActionListener.onResponse(currentListeners, newState);
} }
}); });
} catch (Exception e) { } catch (Exception e) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing; package org.elasticsearch.cluster.routing;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
/** /**
@ -33,5 +34,5 @@ public interface RerouteService {
* this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then * this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then
* the priority of the pending batch is raised to the given priority. * the priority of the pending batch is raised to the given priority.
*/ */
void reroute(String reason, Priority priority, ActionListener<Void> listener); void reroute(String reason, Priority priority, ActionListener<ClusterState> listener);
} }

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -45,7 +46,9 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -65,7 +68,6 @@ public class DiskThresholdMonitor {
private final DiskThresholdSettings diskThresholdSettings; private final DiskThresholdSettings diskThresholdSettings;
private final Client client; private final Client client;
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
private final Supplier<ClusterState> clusterStateSupplier; private final Supplier<ClusterState> clusterStateSupplier;
private final LongSupplier currentTimeMillisSupplier; private final LongSupplier currentTimeMillisSupplier;
private final RerouteService rerouteService; private final RerouteService rerouteService;
@ -73,6 +75,24 @@ public class DiskThresholdMonitor {
private final AtomicBoolean checkInProgress = new AtomicBoolean(); private final AtomicBoolean checkInProgress = new AtomicBoolean();
private final DeprecationLogger deprecationLogger = new DeprecationLogger(logger); private final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
/**
* The IDs of the nodes that were over the low threshold in the last check (and maybe over another threshold too). Tracked so that we
* can log when such nodes are no longer over the low threshold.
*/
private final Set<String> nodesOverLowThreshold = Sets.newConcurrentHashSet();
/**
* The IDs of the nodes that were over the high threshold in the last check (and maybe over another threshold too). Tracked so that we
* can log when such nodes are no longer over the high threshold.
*/
private final Set<String> nodesOverHighThreshold = Sets.newConcurrentHashSet();
/**
* The IDs of the nodes that were over the high threshold in the last check, but which are relocating shards that will bring them
* under the high threshold again. Tracked so that we can log when such nodes are no longer in this state.
*/
private final Set<String> nodesOverHighThresholdAndRelocating = Sets.newConcurrentHashSet();
public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings, public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) { Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) {
this.clusterStateSupplier = clusterStateSupplier; this.clusterStateSupplier = clusterStateSupplier;
@ -86,35 +106,6 @@ public class DiskThresholdMonitor {
} }
} }
/**
* Warn about the given disk usage if the low or high watermark has been passed
*/
private void warnAboutDiskIfNeeded(DiskUsage usage) {
// Check absolute disk values
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()) {
logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only",
diskThresholdSettings.getFreeBytesThresholdFloodStage(), usage);
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
diskThresholdSettings.getFreeBytesThresholdHigh(), usage);
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) {
logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
diskThresholdSettings.getFreeBytesThresholdLow(), usage);
}
// Check percentage disk values
if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only",
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdFloodStage(), "%"), usage);
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage);
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdLow(), "%"), usage);
}
}
private void checkFinished() { private void checkFinished() {
final boolean checkFinished = checkInProgress.compareAndSet(true, false); final boolean checkFinished = checkInProgress.compareAndSet(true, false);
assert checkFinished; assert checkFinished;
@ -137,38 +128,50 @@ public class DiskThresholdMonitor {
String explanation = ""; String explanation = "";
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
// Garbage collect nodes that have been removed from the cluster // Clean up nodes that have been removed from the cluster
// from the map that tracks watermark crossing
final ObjectLookupContainer<String> nodes = usages.keys(); final ObjectLookupContainer<String> nodes = usages.keys();
for (String node : nodeHasPassedWatermark) { cleanUpRemovedNodes(nodes, nodesOverLowThreshold);
if (nodes.contains(node) == false) { cleanUpRemovedNodes(nodes, nodesOverHighThreshold);
nodeHasPassedWatermark.remove(node); cleanUpRemovedNodes(nodes, nodesOverHighThresholdAndRelocating);
}
}
final ClusterState state = clusterStateSupplier.get(); final ClusterState state = clusterStateSupplier.get();
final Set<String> indicesToMarkReadOnly = new HashSet<>(); final Set<String> indicesToMarkReadOnly = new HashSet<>();
RoutingNodes routingNodes = state.getRoutingNodes(); RoutingNodes routingNodes = state.getRoutingNodes();
Set<String> indicesNotToAutoRelease = new HashSet<>(); Set<String> indicesNotToAutoRelease = new HashSet<>();
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease); markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease);
final List<DiskUsage> usagesOverHighThreshold = new ArrayList<>();
for (final ObjectObjectCursor<String, DiskUsage> entry : usages) { for (final ObjectObjectCursor<String, DiskUsage> entry : usages) {
final String node = entry.key; final String node = entry.key;
final DiskUsage usage = entry.value; final DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage); final RoutingNode routingNode = routingNodes.node(node);
RoutingNode routingNode = routingNodes.node(node);
// Only unblock index if all nodes that contain shards of it are below the high disk watermark
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
nodesOverLowThreshold.add(node);
nodesOverHighThreshold.add(node);
nodesOverHighThresholdAndRelocating.remove(node);
if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step
for (ShardRouting routing : routingNode) { for (ShardRouting routing : routingNode) {
String indexName = routing.index().getName(); String indexName = routing.index().getName();
indicesToMarkReadOnly.add(indexName); indicesToMarkReadOnly.add(indexName);
indicesNotToAutoRelease.add(indexName); indicesNotToAutoRelease.add(indexName);
} }
} }
logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only",
diskThresholdSettings.describeFloodStageThreshold(), usage);
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
if (routingNode != null) {
nodesOverLowThreshold.add(node);
nodesOverHighThreshold.add(node);
if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step
for (ShardRouting routing : routingNode) { for (ShardRouting routing : routingNode) {
String indexName = routing.index().getName(); String indexName = routing.index().getName();
indicesNotToAutoRelease.add(indexName); indicesNotToAutoRelease.add(indexName);
@ -177,41 +180,98 @@ public class DiskThresholdMonitor {
if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
reroute = true; reroute = true;
explanation = "high disk watermark exceeded on one or more nodes"; explanation = "high disk watermark exceeded on one or more nodes";
usagesOverHighThreshold.add(usage);
// will log about this node when the reroute completes
} else { } else {
logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " +
"in the last [{}], skipping reroute", "in the last [{}], skipping reroute",
node, diskThresholdSettings.getRerouteInterval()); node, diskThresholdSettings.getRerouteInterval());
} }
nodeHasPassedWatermark.add(node);
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() || } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
nodeHasPassedWatermark.add(node);
nodesOverHighThresholdAndRelocating.remove(node);
final boolean wasUnderLowThreshold = nodesOverLowThreshold.add(node);
final boolean wasOverHighThreshold = nodesOverHighThreshold.remove(node);
assert (wasUnderLowThreshold && wasOverHighThreshold) == false;
if (wasUnderLowThreshold) {
logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
diskThresholdSettings.describeLowThreshold(), usage);
} else if (wasOverHighThreshold) {
logger.info("high disk watermark [{}] no longer exceeded on {}, but low disk watermark [{}] is still exceeded",
diskThresholdSettings.describeHighThreshold(), usage, diskThresholdSettings.describeLowThreshold());
}
} else { } else {
if (nodeHasPassedWatermark.contains(node)) {
// The node has previously been over the high or nodesOverHighThresholdAndRelocating.remove(node);
// low watermark, but is no longer, so we should
// reroute so any unassigned shards can be allocated if (nodesOverLowThreshold.contains(node)) {
// if they are able to be // The node has previously been over the low watermark, but is no longer, so it may be possible to allocate more shards
// if we reroute now.
if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
reroute = true; reroute = true;
explanation = "one or more nodes has gone under the high or low watermark"; explanation = "one or more nodes has gone under the high or low watermark";
nodeHasPassedWatermark.remove(node); nodesOverLowThreshold.remove(node);
nodesOverHighThreshold.remove(node);
logger.info("low disk watermark [{}] no longer exceeded on {}",
diskThresholdSettings.describeLowThreshold(), usage);
} else { } else {
logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " +
"in the last [{}], skipping reroute", "in the last [{}], skipping reroute",
node, diskThresholdSettings.getRerouteInterval()); node, diskThresholdSettings.getRerouteInterval());
} }
} }
} }
} }
final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3); final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3);
if (reroute) { if (reroute) {
logger.info("rerouting shards: [{}]", explanation); logger.debug("rerouting shards: [{}]", explanation);
rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(r -> { rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(reroutedClusterState -> {
for (DiskUsage diskUsage : usagesOverHighThreshold) {
final RoutingNode routingNode = reroutedClusterState.getRoutingNodes().node(diskUsage.getNodeId());
final DiskUsage usageIncludingRelocations;
final long relocatingShardsSize;
if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step
relocatingShardsSize = sizeOfRelocatingShards(routingNode, diskUsage, info, reroutedClusterState);
usageIncludingRelocations = new DiskUsage(diskUsage.getNodeId(), diskUsage.getNodeName(),
diskUsage.getPath(), diskUsage.getTotalBytes(), diskUsage.getFreeBytes() - relocatingShardsSize);
} else {
usageIncludingRelocations = diskUsage;
relocatingShardsSize = 0L;
}
if (usageIncludingRelocations.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()
|| usageIncludingRelocations.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
nodesOverHighThresholdAndRelocating.remove(diskUsage.getNodeId());
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; " +
"currently relocating away shards totalling [{}] bytes; the node is expected to continue to exceed " +
"the high disk watermark when these relocations are complete",
diskThresholdSettings.describeHighThreshold(), diskUsage, -relocatingShardsSize);
} else if (nodesOverHighThresholdAndRelocating.add(diskUsage.getNodeId())) {
logger.info("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; " +
"currently relocating away shards totalling [{}] bytes; the node is expected to be below the high " +
"disk watermark when these relocations are complete",
diskThresholdSettings.describeHighThreshold(), diskUsage, -relocatingShardsSize);
} else {
logger.debug("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; " +
"currently relocating away shards totalling [{}] bytes",
diskThresholdSettings.describeHighThreshold(), diskUsage, -relocatingShardsSize);
}
}
setLastRunTimeMillis(); setLastRunTimeMillis();
listener.onResponse(r); listener.onResponse(null);
}, e -> { }, e -> {
logger.debug("reroute failed", e); logger.debug("reroute failed", e);
setLastRunTimeMillis(); setLastRunTimeMillis();
@ -220,7 +280,7 @@ public class DiskThresholdMonitor {
} else { } else {
listener.onResponse(null); listener.onResponse(null);
} }
Set<String> indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting() final Set<String> indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting()
.spliterator(), false) .spliterator(), false)
.map(c -> c.key) .map(c -> c.key)
.filter(index -> indicesNotToAutoRelease.contains(index) == false) .filter(index -> indicesNotToAutoRelease.contains(index) == false)
@ -250,6 +310,12 @@ public class DiskThresholdMonitor {
} }
} }
// exposed for tests to override
long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
return DiskThresholdDecider.sizeOfRelocatingShards(routingNode, true,
diskUsage.getPath(), info, reroutedClusterState.metaData(), reroutedClusterState.routingTable());
}
private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap<String, DiskUsage> usages, private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap<String, DiskUsage> usages,
Set<String> indicesToMarkIneligibleForAutoRelease) { Set<String> indicesToMarkIneligibleForAutoRelease) {
for (RoutingNode routingNode : routingNodes) { for (RoutingNode routingNode : routingNodes) {
@ -262,7 +328,6 @@ public class DiskThresholdMonitor {
} }
} }
} }
} }
private void setLastRunTimeMillis() { private void setLastRunTimeMillis() {
@ -286,4 +351,12 @@ public class DiskThresholdMonitor {
.setSettings(readOnlySettings) .setSettings(readOnlySettings)
.execute(ActionListener.map(wrappedListener, r -> null)); .execute(ActionListener.map(wrappedListener, r -> null));
} }
private static void cleanUpRemovedNodes(ObjectLookupContainer<String> nodesToKeep, Set<String> nodesToCleanUp) {
for (String node : nodesToCleanUp) {
if (nodesToKeep.contains(node) == false) {
nodesToCleanUp.remove(node);
}
}
}
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation; package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -321,6 +322,24 @@ public class DiskThresholdSettings {
return rerouteInterval; return rerouteInterval;
} }
String describeLowThreshold() {
return freeBytesThresholdLow.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdLow, "%")
: freeBytesThresholdLow.toString();
}
String describeHighThreshold() {
return freeBytesThresholdHigh.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdHigh, "%")
: freeBytesThresholdHigh.toString();
}
String describeFloodStageThreshold() {
return freeBytesThresholdFloodStage.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdFloodStage, "%")
: freeBytesThresholdFloodStage.toString();
}
/** /**
* Attempts to parse the watermark into a percentage, returning 100.0% if * Attempts to parse the watermark into a percentage, returning 100.0% if
* it cannot be parsed. * it cannot be parsed.

View File

@ -817,8 +817,9 @@ public class BalancedShardsAllocator implements ShardsAllocator {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
} }
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation, final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes()); shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
minNode.addShard(shard); minNode.addShard(shard);
if (!shard.primary()) { if (!shard.primary()) {
@ -838,8 +839,9 @@ public class BalancedShardsAllocator implements ShardsAllocator {
if (minNode != null) { if (minNode != null) {
// throttle decision scenario // throttle decision scenario
assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED; assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation, final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize)); minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
final RoutingNode node = minNode.getRoutingNode(); final RoutingNode node = minNode.getRoutingNode();
final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type(); final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type();

View File

@ -27,9 +27,11 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
@ -86,10 +88,9 @@ public class DiskThresholdDecider extends AllocationDecider {
* *
* If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards * If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards
*/ */
static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShardsMovingAway, String dataPath, ClusterInfo clusterInfo,
boolean subtractShardsMovingAway, String dataPath) { MetaData metaData, RoutingTable routingTable) {
ClusterInfo clusterInfo = allocation.clusterInfo(); long totalSize = 0L;
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) { for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
if (routing.relocatingNodeId() == null) { if (routing.relocatingNodeId() == null) {
@ -103,7 +104,7 @@ public class DiskThresholdDecider extends AllocationDecider {
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least // if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
// free space // free space
if (actualPath == null || actualPath.equals(dataPath)) { if (actualPath == null || actualPath.equals(dataPath)) {
totalSize += getExpectedShardSize(routing, allocation, 0); totalSize += getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable);
} }
} }
@ -115,7 +116,7 @@ public class DiskThresholdDecider extends AllocationDecider {
actualPath = clusterInfo.getDataPath(routing.cancelRelocation()); actualPath = clusterInfo.getDataPath(routing.cancelRelocation());
} }
if (dataPath.equals(actualPath)) { if (dataPath.equals(actualPath)) {
totalSize -= getExpectedShardSize(routing, allocation, 0); totalSize -= getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable);
} }
} }
} }
@ -239,7 +240,8 @@ public class DiskThresholdDecider extends AllocationDecider {
} }
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark // Secondly, check that allocating the shard to this node doesn't put it above the high watermark
final long shardSize = getExpectedShardSize(shardRouting, allocation, 0); final long shardSize = getExpectedShardSize(shardRouting, 0L,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize; long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
@ -342,7 +344,8 @@ public class DiskThresholdDecider extends AllocationDecider {
} }
if (diskThresholdSettings.includeRelocations()) { if (diskThresholdSettings.includeRelocations()) {
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath()); final long relocatingShardsSize = sizeOfRelocatingShards(node, subtractLeavingShards, usage.getPath(),
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(), DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -425,29 +428,28 @@ public class DiskThresholdDecider extends AllocationDecider {
* Returns the expected shard size for the given shard or the default value provided if not enough information are available * Returns the expected shard size for the given shard or the default value provided if not enough information are available
* to estimate the shards size. * to estimate the shards size.
*/ */
public static long getExpectedShardSize(ShardRouting shard, RoutingAllocation allocation, long defaultValue) { public static long getExpectedShardSize(ShardRouting shard, long defaultValue, ClusterInfo clusterInfo, MetaData metaData,
final IndexMetaData metaData = allocation.metaData().getIndexSafe(shard.index()); RoutingTable routingTable) {
final ClusterInfo info = allocation.clusterInfo(); final IndexMetaData indexMetaData = metaData.getIndexSafe(shard.index());
if (metaData.getResizeSourceIndex() != null && shard.active() == false && if (indexMetaData.getResizeSourceIndex() != null && shard.active() == false &&
shard.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { shard.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
// in the shrink index case we sum up the source index shards since we basically make a copy of the shard in // in the shrink index case we sum up the source index shards since we basically make a copy of the shard in
// the worst case // the worst case
long targetShardSize = 0; long targetShardSize = 0;
final Index mergeSourceIndex = metaData.getResizeSourceIndex(); final Index mergeSourceIndex = indexMetaData.getResizeSourceIndex();
final IndexMetaData sourceIndexMeta = allocation.metaData().index(mergeSourceIndex); final IndexMetaData sourceIndexMeta = metaData.index(mergeSourceIndex);
if (sourceIndexMeta != null) { if (sourceIndexMeta != null) {
final Set<ShardId> shardIds = IndexMetaData.selectRecoverFromShards(shard.id(), final Set<ShardId> shardIds = IndexMetaData.selectRecoverFromShards(shard.id(),
sourceIndexMeta, metaData.getNumberOfShards()); sourceIndexMeta, indexMetaData.getNumberOfShards());
for (IndexShardRoutingTable shardRoutingTable : allocation.routingTable().index(mergeSourceIndex.getName())) { for (IndexShardRoutingTable shardRoutingTable : routingTable.index(mergeSourceIndex.getName())) {
if (shardIds.contains(shardRoutingTable.shardId())) { if (shardIds.contains(shardRoutingTable.shardId())) {
targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0); targetShardSize += clusterInfo.getShardSize(shardRoutingTable.primaryShard(), 0);
} }
} }
} }
return targetShardSize == 0 ? defaultValue : targetShardSize; return targetShardSize == 0 ? defaultValue : targetShardSize;
} else { } else {
return info.getShardSize(shard, defaultValue); return clusterInfo.getShardSize(shard, defaultValue);
} }
} }
} }

View File

@ -55,7 +55,8 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC
private ShardStateAction.ShardStartedClusterStateTaskExecutor executor; private ShardStateAction.ShardStartedClusterStateTaskExecutor executor;
private static void neverReroutes(String reason, Priority priority, ActionListener<Void> listener) { @SuppressWarnings("unused")
private static void neverReroutes(String reason, Priority priority, ActionListener<ClusterState> listener) {
fail("unexpectedly ran a deferred reroute"); fail("unexpectedly ran a deferred reroute");
} }

View File

@ -18,6 +18,9 @@
*/ */
package org.elasticsearch.cluster.routing.allocation; package org.elasticsearch.cluster.routing.allocation;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfo;
@ -30,12 +33,16 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -44,6 +51,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -146,7 +154,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build();
AtomicLong currentTime = new AtomicLong(); AtomicLong currentTime = new AtomicLong();
AtomicReference<ActionListener<Void>> listenerReference = new AtomicReference<>(); AtomicReference<ActionListener<ClusterState>> listenerReference = new AtomicReference<>();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get,
(reason, priority, listener) -> { (reason, priority, listener) -> {
@ -180,7 +188,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
currentTime.addAndGet(randomLongBetween(0, 120000)); currentTime.addAndGet(randomLongBetween(0, 120000));
monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null));
assertNotNull(listenerReference.get()); assertNotNull(listenerReference.get());
listenerReference.getAndSet(null).onResponse(null); listenerReference.getAndSet(null).onResponse(clusterState);
if (randomBoolean()) { if (randomBoolean()) {
// should not re-route again within the reroute interval // should not re-route again within the reroute interval
@ -195,7 +203,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000));
monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null));
assertNotNull(listenerReference.get()); assertNotNull(listenerReference.get());
final ActionListener<Void> rerouteListener1 = listenerReference.getAndSet(null); final ActionListener<ClusterState> rerouteListener1 = listenerReference.getAndSet(null);
// should not re-route again before reroute has completed // should not re-route again before reroute has completed
currentTime.addAndGet(randomLongBetween(0, 120000)); currentTime.addAndGet(randomLongBetween(0, 120000));
@ -203,7 +211,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
assertNull(listenerReference.get()); assertNull(listenerReference.get());
// complete reroute // complete reroute
rerouteListener1.onResponse(null); rerouteListener1.onResponse(clusterState);
if (randomBoolean()) { if (randomBoolean()) {
// should not re-route again within the reroute interval // should not re-route again within the reroute interval
@ -250,7 +258,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
(reason, priority, listener) -> { (reason, priority, listener) -> {
assertNotNull(listener); assertNotNull(listener);
assertThat(priority, equalTo(Priority.HIGH)); assertThat(priority, equalTo(Priority.HIGH));
listener.onResponse(null); listener.onResponse(clusterState);
}) { }) {
@Override @Override
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) { protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
@ -287,7 +295,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
(reason, priority, listener) -> { (reason, priority, listener) -> {
assertNotNull(listener); assertNotNull(listener);
assertThat(priority, equalTo(Priority.HIGH)); assertThat(priority, equalTo(Priority.HIGH));
listener.onResponse(null); listener.onResponse(clusterStateWithBlocks);
}) { }) {
@Override @Override
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) { protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
@ -365,4 +373,201 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
assertThat(indicesToMarkReadOnly.get(), contains("test_1")); assertThat(indicesToMarkReadOnly.get(), contains("test_1"));
assertNull(indicesToRelease.get()); assertNull(indicesToRelease.get());
} }
@TestLogging(value="org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor:INFO", reason="testing INFO/WARN logging")
public void testDiskMonitorLogging() throws IllegalAccessException {
final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(DiscoveryNodes.builder().add(newNode("node1"))).build();
final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(clusterState);
final AtomicBoolean advanceTime = new AtomicBoolean(randomBoolean());
final LongSupplier timeSupplier = new LongSupplier() {
long time;
@Override
public long getAsLong() {
if (advanceTime.get()) {
time += DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).getMillis() + 1;
}
logger.info("time: [{}]", time);
return time;
}
};
final AtomicLong relocatingShardSizeRef = new AtomicLong();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, clusterStateRef::get,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, timeSupplier,
(reason, priority, listener) -> listener.onResponse(clusterStateRef.get())) {
@Override
protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener, boolean readOnly) {
listener.onResponse(null);
}
@Override
long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
return relocatingShardSizeRef.get();
}
};
final ImmutableOpenMap.Builder<String, DiskUsage> allDisksOkBuilder;
allDisksOkBuilder = ImmutableOpenMap.builder();
allDisksOkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(15, 100)));
final ImmutableOpenMap<String, DiskUsage> allDisksOk = allDisksOkBuilder.build();
final ImmutableOpenMap.Builder<String, DiskUsage> aboveLowWatermarkBuilder = ImmutableOpenMap.builder();
aboveLowWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 14)));
final ImmutableOpenMap<String, DiskUsage> aboveLowWatermark = aboveLowWatermarkBuilder.build();
final ImmutableOpenMap.Builder<String, DiskUsage> aboveHighWatermarkBuilder = ImmutableOpenMap.builder();
aboveHighWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 9)));
final ImmutableOpenMap<String, DiskUsage> aboveHighWatermark = aboveHighWatermarkBuilder.build();
final ImmutableOpenMap.Builder<String, DiskUsage> aboveFloodStageWatermarkBuilder = ImmutableOpenMap.builder();
aboveFloodStageWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
final ImmutableOpenMap<String, DiskUsage> aboveFloodStageWatermark = aboveFloodStageWatermarkBuilder.build();
assertNoLogging(monitor, allDisksOk);
assertSingleInfoMessage(monitor, aboveLowWatermark,
"low disk watermark [85%] exceeded on * replicas will not be assigned to this node");
advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
assertSingleWarningMessage(monitor, aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* " +
"the node is expected to continue to exceed the high disk watermark when these relocations are complete");
advanceTime.set(true);
assertRepeatedWarningMessages(monitor, aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* " +
"the node is expected to continue to exceed the high disk watermark when these relocations are complete");
advanceTime.set(randomBoolean());
assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark,
"flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only");
relocatingShardSizeRef.set(-5L);
advanceTime.set(true);
assertSingleInfoMessage(monitor, aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* " +
"the node is expected to be below the high disk watermark when these relocations are complete");
relocatingShardSizeRef.set(0L);
timeSupplier.getAsLong(); // advance time long enough to do another reroute
advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
assertSingleWarningMessage(monitor, aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* " +
"the node is expected to continue to exceed the high disk watermark when these relocations are complete");
advanceTime.set(true);
assertRepeatedWarningMessages(monitor, aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* " +
"the node is expected to continue to exceed the high disk watermark when these relocations are complete");
advanceTime.set(randomBoolean());
assertSingleInfoMessage(monitor, aboveLowWatermark,
"high disk watermark [90%] no longer exceeded on * but low disk watermark [85%] is still exceeded");
advanceTime.set(true); // only log about dropping below the low disk watermark on a reroute
assertSingleInfoMessage(monitor, allDisksOk,
"low disk watermark [85%] no longer exceeded on *");
advanceTime.set(randomBoolean());
assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark,
"flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only");
assertSingleInfoMessage(monitor, allDisksOk,
"low disk watermark [85%] no longer exceeded on *");
advanceTime.set(true);
assertRepeatedWarningMessages(monitor, aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* " +
"the node is expected to continue to exceed the high disk watermark when these relocations are complete");
assertSingleInfoMessage(monitor, allDisksOk,
"low disk watermark [85%] no longer exceeded on *");
assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark,
"flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only");
assertSingleInfoMessage(monitor, aboveLowWatermark,
"high disk watermark [90%] no longer exceeded on * but low disk watermark [85%] is still exceeded");
}
private void assertNoLogging(DiskThresholdMonitor monitor,
ImmutableOpenMap<String, DiskUsage> diskUsages) throws IllegalAccessException {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation(
"any INFO message",
DiskThresholdMonitor.class.getCanonicalName(),
Level.INFO,
"*"));
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation(
"any WARN message",
DiskThresholdMonitor.class.getCanonicalName(),
Level.WARN,
"*"));
Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class);
Loggers.addAppender(diskThresholdMonitorLogger, mockAppender);
for (int i = between(1, 3); i >= 0; i--) {
monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null));
}
mockAppender.assertAllExpectationsMatched();
Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender);
mockAppender.stop();
}
private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor,
ImmutableOpenMap<String, DiskUsage> diskUsages,
String message) throws IllegalAccessException {
for (int i = between(1, 3); i >= 0; i--) {
assertLogging(monitor, diskUsages, Level.WARN, message);
}
}
private void assertSingleWarningMessage(DiskThresholdMonitor monitor,
ImmutableOpenMap<String, DiskUsage> diskUsages,
String message) throws IllegalAccessException {
assertLogging(monitor, diskUsages, Level.WARN, message);
assertNoLogging(monitor, diskUsages);
}
private void assertSingleInfoMessage(DiskThresholdMonitor monitor,
ImmutableOpenMap<String, DiskUsage> diskUsages,
String message) throws IllegalAccessException {
assertLogging(monitor, diskUsages, Level.INFO, message);
assertNoLogging(monitor, diskUsages);
}
private void assertLogging(DiskThresholdMonitor monitor,
ImmutableOpenMap<String, DiskUsage> diskUsages,
Level level,
String message) throws IllegalAccessException {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation(
"expected message",
DiskThresholdMonitor.class.getCanonicalName(),
level,
message));
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation(
"any message of another level",
DiskThresholdMonitor.class.getCanonicalName(),
level == Level.INFO ? Level.WARN : Level.INFO,
"*"));
Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class);
Loggers.addAppender(diskThresholdMonitorLogger, mockAppender);
monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null));
mockAppender.assertAllExpectationsMatched();
Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender);
mockAppender.stop();
}
} }

View File

@ -256,4 +256,33 @@ public class DiskThresholdSettingsTests extends ESTestCase {
} }
} }
public void testThresholdDescriptions() {
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
DiskThresholdSettings diskThresholdSettings = new DiskThresholdSettings(Settings.EMPTY, clusterSettings);
assertThat(diskThresholdSettings.describeLowThreshold(), equalTo("85%"));
assertThat(diskThresholdSettings.describeHighThreshold(), equalTo("90%"));
assertThat(diskThresholdSettings.describeFloodStageThreshold(), equalTo("95%"));
diskThresholdSettings = new DiskThresholdSettings(Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "91.2%")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "91.3%")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "91.4%")
.build(), clusterSettings);
assertThat(diskThresholdSettings.describeLowThreshold(), equalTo("91.2%"));
assertThat(diskThresholdSettings.describeHighThreshold(), equalTo("91.3%"));
assertThat(diskThresholdSettings.describeFloodStageThreshold(), equalTo("91.4%"));
diskThresholdSettings = new DiskThresholdSettings(Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1GB")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "10MB")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1B")
.build(), clusterSettings);
assertThat(diskThresholdSettings.describeLowThreshold(), equalTo("1gb"));
assertThat(diskThresholdSettings.describeHighThreshold(), equalTo("10mb"));
assertThat(diskThresholdSettings.describeFloodStageThreshold(), equalTo("1b"));
}
} }

View File

@ -315,22 +315,22 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
test_2 = ShardRoutingHelper.initialize(test_2, "node1"); test_2 = ShardRoutingHelper.initialize(test_2, "node1");
test_2 = ShardRoutingHelper.moveToStarted(test_2); test_2 = ShardRoutingHelper.moveToStarted(test_2);
assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0)); assertEquals(1000L, getExpectedShardSize(test_2, 0L, allocation));
assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0)); assertEquals(100L, getExpectedShardSize(test_1, 0L, allocation));
assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0)); assertEquals(10L, getExpectedShardSize(test_0, 0L, allocation));
RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", buildNewFakeTransportAddress(), RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2); emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2);
assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); assertEquals(100L, sizeOfRelocatingShards(allocation, node, false, "/dev/null"));
assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); assertEquals(90L, sizeOfRelocatingShards(allocation, node, true, "/dev/null"));
assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev")); assertEquals(0L, sizeOfRelocatingShards(allocation, node, true, "/dev/some/other/dev"));
assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev")); assertEquals(0L, sizeOfRelocatingShards(allocation, node, true, "/dev/some/other/dev"));
ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), false, PeerRecoverySource.INSTANCE, ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), false, PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
test_3 = ShardRoutingHelper.initialize(test_3, "node1"); test_3 = ShardRoutingHelper.initialize(test_3, "node1");
test_3 = ShardRoutingHelper.moveToStarted(test_3); test_3 = ShardRoutingHelper.moveToStarted(test_3);
assertEquals(0L, DiskThresholdDecider.getExpectedShardSize(test_3, allocation, 0)); assertEquals(0L, getExpectedShardSize(test_3, 0L, allocation));
ShardRouting other_0 = ShardRouting.newUnassigned(new ShardId("other", "5678", 0), randomBoolean(), ShardRouting other_0 = ShardRouting.newUnassigned(new ShardId("other", "5678", 0), randomBoolean(),
@ -342,14 +342,19 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
node = new RoutingNode("node1", new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), node = new RoutingNode("node1", new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(),
Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2, other_0.getTargetRelocatingShard()); Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2, other_0.getTargetRelocatingShard());
if (other_0.primary()) { if (other_0.primary()) {
assertEquals(10100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); assertEquals(10100L, sizeOfRelocatingShards(allocation, node, false, "/dev/null"));
assertEquals(10090L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); assertEquals(10090L, sizeOfRelocatingShards(allocation, node, true, "/dev/null"));
} else { } else {
assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); assertEquals(100L, sizeOfRelocatingShards(allocation, node, false, "/dev/null"));
assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); assertEquals(90L, sizeOfRelocatingShards(allocation, node, true, "/dev/null"));
} }
} }
public long sizeOfRelocatingShards(RoutingAllocation allocation, RoutingNode node, boolean subtractShardsMovingAway, String dataPath) {
return DiskThresholdDecider.sizeOfRelocatingShards(node, subtractShardsMovingAway, dataPath,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
}
public void testSizeShrinkIndex() { public void testSizeShrinkIndex() {
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
shardSizes.put("[test][0][p]", 10L); shardSizes.put("[test][0][p]", 10L);
@ -404,22 +409,22 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), true, ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), true,
LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
test_3 = ShardRoutingHelper.initialize(test_3, "node1"); test_3 = ShardRoutingHelper.initialize(test_3, "node1");
assertEquals(500L, DiskThresholdDecider.getExpectedShardSize(test_3, allocation, 0)); assertEquals(500L, getExpectedShardSize(test_3, 0L, allocation));
assertEquals(500L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0)); assertEquals(500L, getExpectedShardSize(test_2, 0L, allocation));
assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0)); assertEquals(100L, getExpectedShardSize(test_1, 0L, allocation));
assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0)); assertEquals(10L, getExpectedShardSize(test_0, 0L, allocation));
ShardRouting target = ShardRouting.newUnassigned(new ShardId(new Index("target", "5678"), 0), ShardRouting target = ShardRouting.newUnassigned(new ShardId(new Index("target", "5678"), 0),
true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
assertEquals(1110L, DiskThresholdDecider.getExpectedShardSize(target, allocation, 0)); assertEquals(1110L, getExpectedShardSize(target, 0L, allocation));
ShardRouting target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 0), ShardRouting target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 0),
true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
assertEquals(110L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0)); assertEquals(110L, getExpectedShardSize(target2, 0L, allocation));
target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 1), target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 1),
true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0)); assertEquals(1000L, getExpectedShardSize(target2, 0L, allocation));
// check that the DiskThresholdDecider still works even if the source index has been deleted // check that the DiskThresholdDecider still works even if the source index has been deleted
ClusterState clusterStateWithMissingSourceIndex = ClusterState.builder(clusterState) ClusterState clusterStateWithMissingSourceIndex = ClusterState.builder(clusterState)
@ -430,7 +435,13 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
allocationService.reroute(clusterState, "foo"); allocationService.reroute(clusterState, "foo");
RoutingAllocation allocationWithMissingSourceIndex = new RoutingAllocation(null, RoutingAllocation allocationWithMissingSourceIndex = new RoutingAllocation(null,
clusterStateWithMissingSourceIndex.getRoutingNodes(), clusterStateWithMissingSourceIndex, info, 0); clusterStateWithMissingSourceIndex.getRoutingNodes(), clusterStateWithMissingSourceIndex, info, 0);
assertEquals(42L, DiskThresholdDecider.getExpectedShardSize(target, allocationWithMissingSourceIndex, 42L)); assertEquals(42L, getExpectedShardSize(target, 42L, allocationWithMissingSourceIndex));
assertEquals(42L, DiskThresholdDecider.getExpectedShardSize(target2, allocationWithMissingSourceIndex, 42L)); assertEquals(42L, getExpectedShardSize(target2, 42L, allocationWithMissingSourceIndex));
} }
private static long getExpectedShardSize(ShardRouting shardRouting, long defaultSize, RoutingAllocation allocation) {
return DiskThresholdDecider.getExpectedShardSize(shardRouting, defaultSize,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
}
} }