diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 6c22ff171dd..534f007e8b2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -34,7 +34,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractComponent; @@ -106,10 +106,10 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu this.threadPool = threadPool; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); - this.enabled = DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); + this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency); - clusterSettings.addSettingsUpdateConsumer(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled); + clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled); // Add InternalClusterInfoService to listen for Master changes this.clusterService.add((LocalNodeMasterListener)this); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 1bfd775ecce..29bb55f8107 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -29,7 +29,7 @@ import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; @@ -750,7 +750,7 @@ public class MetaData implements Iterable, Diffable, Fr RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.getKey(), RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.getKey(), RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.getKey(), - DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.getKey(), DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java new file mode 100644 index 00000000000..103aa87dcd3 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -0,0 +1,144 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import java.util.Set; + +import com.carrotsearch.hppc.ObjectLookupContainer; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.DiskUsage; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; + +/** + * Listens for a node to go over the high watermark and kicks off an empty + * reroute if it does. Also responsible for logging about nodes that have + * passed the disk watermarks + */ +public class DiskThresholdMonitor extends AbstractComponent implements ClusterInfoService.Listener { + private final DiskThresholdSettings diskThresholdSettings; + private final Client client; + private final Set nodeHasPassedWatermark = Sets.newConcurrentHashSet(); + + private long lastRunNS; + + // TODO: remove injection when ClusterInfoService is not injected + @Inject + public DiskThresholdMonitor(Settings settings, ClusterSettings clusterSettings, + ClusterInfoService infoService, Client client) { + super(settings); + this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); + this.client = client; + infoService.addListener(this); + } + + /** + * Warn about the given disk usage if the low or high watermark has been passed + */ + private void warnAboutDiskIfNeeded(DiskUsage usage) { + // Check absolute disk values + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().bytes()) { + logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", + diskThresholdSettings.getFreeBytesThresholdHigh(), usage); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().bytes()) { + logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", + diskThresholdSettings.getFreeBytesThresholdLow(), usage); + } + + // Check percentage disk values + if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", + Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage); + } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { + logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", + Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdLow(), "%"), usage); + } + } + + @Override + public void onNewInfo(ClusterInfo info) { + ImmutableOpenMap usages = info.getNodeLeastAvailableDiskUsages(); + if (usages != null) { + boolean reroute = false; + String explanation = ""; + + // Garbage collect nodes that have been removed from the cluster + // from the map that tracks watermark crossing + ObjectLookupContainer nodes = usages.keys(); + for (String node : nodeHasPassedWatermark) { + if (nodes.contains(node) == false) { + nodeHasPassedWatermark.remove(node); + } + } + + for (ObjectObjectCursor entry : usages) { + String node = entry.key; + DiskUsage usage = entry.value; + warnAboutDiskIfNeeded(usage); + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().bytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) { + lastRunNS = System.nanoTime(); + reroute = true; + explanation = "high disk watermark exceeded on one or more nodes"; + } else { + logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + + "in the last [{}], skipping reroute", + node, diskThresholdSettings.getRerouteInterval()); + } + nodeHasPassedWatermark.add(node); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().bytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { + nodeHasPassedWatermark.add(node); + } else { + if (nodeHasPassedWatermark.contains(node)) { + // The node has previously been over the high or + // low watermark, but is no longer, so we should + // reroute so any unassigned shards can be allocated + // if they are able to be + if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) { + lastRunNS = System.nanoTime(); + reroute = true; + explanation = "one or more nodes has gone under the high or low watermark"; + nodeHasPassedWatermark.remove(node); + } else { + logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + + "in the last [{}], skipping reroute", + node, diskThresholdSettings.getRerouteInterval()); + } + } + } + } + if (reroute) { + logger.info("rerouting shards: [{}]", explanation); + // Execute an empty reroute, but don't block on the response + client.admin().cluster().prepareReroute().execute(); + } + } + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java new file mode 100644 index 00000000000..81b9042fb33 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java @@ -0,0 +1,174 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.RatioValue; +import org.elasticsearch.common.unit.TimeValue; + +/** + * A container to keep settings for disk thresholds up to date with cluster setting changes. + */ +public class DiskThresholdSettings { + public static final Setting CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING = + Setting.boolSetting("cluster.routing.allocation.disk.threshold_enabled", true, + Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING = + new Setting<>("cluster.routing.allocation.disk.watermark.low", "85%", + (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.low"), + Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING = + new Setting<>("cluster.routing.allocation.disk.watermark.high", "90%", + (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"), + Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING = + Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true, + Setting.Property.Dynamic, Setting.Property.NodeScope);; + public static final Setting CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING = + Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60), + Setting.Property.Dynamic, Setting.Property.NodeScope); + + private volatile Double freeDiskThresholdLow; + private volatile Double freeDiskThresholdHigh; + private volatile ByteSizeValue freeBytesThresholdLow; + private volatile ByteSizeValue freeBytesThresholdHigh; + private volatile boolean includeRelocations; + private volatile boolean enabled; + private volatile TimeValue rerouteInterval; + + public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) { + final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings); + final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings); + setHighWatermark(highWatermark); + setLowWatermark(lowWatermark); + this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings); + this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings); + this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled); + } + + private void setIncludeRelocations(boolean includeRelocations) { + this.includeRelocations = includeRelocations; + } + + private void setRerouteInterval(TimeValue rerouteInterval) { + this.rerouteInterval = rerouteInterval; + } + + private void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + private void setLowWatermark(String lowWatermark) { + // Watermark is expressed in terms of used data, but we need "free" data watermark + this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark); + this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, + CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); + } + + private void setHighWatermark(String highWatermark) { + // Watermark is expressed in terms of used data, but we need "free" data watermark + this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark); + this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, + CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); + } + + public Double getFreeDiskThresholdLow() { + return freeDiskThresholdLow; + } + + public Double getFreeDiskThresholdHigh() { + return freeDiskThresholdHigh; + } + + public ByteSizeValue getFreeBytesThresholdLow() { + return freeBytesThresholdLow; + } + + public ByteSizeValue getFreeBytesThresholdHigh() { + return freeBytesThresholdHigh; + } + + public boolean includeRelocations() { + return includeRelocations; + } + + public boolean isEnabled() { + return enabled; + } + + public TimeValue getRerouteInterval() { + return rerouteInterval; + } + + /** + * Attempts to parse the watermark into a percentage, returning 100.0% if + * it cannot be parsed. + */ + private double thresholdPercentageFromWatermark(String watermark) { + try { + return RatioValue.parseRatioValue(watermark).getAsPercent(); + } catch (ElasticsearchParseException ex) { + // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two + // cases separately + return 100.0; + } + } + + /** + * Attempts to parse the watermark into a {@link ByteSizeValue}, returning + * a ByteSizeValue of 0 bytes if the value cannot be parsed. + */ + private ByteSizeValue thresholdBytesFromWatermark(String watermark, String settingName) { + try { + return ByteSizeValue.parseBytesSizeValue(watermark, settingName); + } catch (ElasticsearchParseException ex) { + // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two + // cases separately + return ByteSizeValue.parseBytesSizeValue("0b", settingName); + } + } + + /** + * Checks if a watermark string is a valid percentage or byte size value, + * @return the watermark value given + */ + private static String validWatermarkSetting(String watermark, String settingName) { + try { + RatioValue.parseRatioValue(watermark); + } catch (ElasticsearchParseException e) { + try { + ByteSizeValue.parseBytesSizeValue(watermark, settingName); + } catch (ElasticsearchParseException ex) { + ex.addSuppressed(e); + throw ex; + } + } + return watermark; + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index edeb609a9c7..2aacac0cdd0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -19,37 +19,27 @@ package org.elasticsearch.cluster.routing.allocation.decider; -import com.carrotsearch.hppc.ObjectLookupContainer; +import java.util.Set; + import com.carrotsearch.hppc.cursors.ObjectCursor; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.DiskUsage; -import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.RatioValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import java.util.Set; - /** * The {@link DiskThresholdDecider} checks that the node a shard is potentially * being allocated to has enough disk space. @@ -77,226 +67,12 @@ public class DiskThresholdDecider extends AllocationDecider { public static final String NAME = "disk_threshold"; - private volatile Double freeDiskThresholdLow; - private volatile Double freeDiskThresholdHigh; - private volatile ByteSizeValue freeBytesThresholdLow; - private volatile ByteSizeValue freeBytesThresholdHigh; - private volatile boolean includeRelocations; - private volatile boolean enabled; - private volatile TimeValue rerouteInterval; - - public static final Setting CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING = - Setting.boolSetting("cluster.routing.allocation.disk.threshold_enabled", true, Property.Dynamic, Property.NodeScope); - public static final Setting CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING = - new Setting<>("cluster.routing.allocation.disk.watermark.low", "85%", - (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.low"), - Property.Dynamic, Property.NodeScope); - public static final Setting CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING = - new Setting<>("cluster.routing.allocation.disk.watermark.high", "90%", - (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"), - Property.Dynamic, Property.NodeScope); - public static final Setting CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING = - Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true, - Property.Dynamic, Property.NodeScope);; - public static final Setting CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING = - Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60), - Property.Dynamic, Property.NodeScope); - - /** - * Listens for a node to go over the high watermark and kicks off an empty - * reroute if it does. Also responsible for logging about nodes that have - * passed the disk watermarks - */ - class DiskListener implements ClusterInfoService.Listener { - private final Client client; - private final Set nodeHasPassedWatermark = Sets.newConcurrentHashSet(); - - private long lastRunNS; - - DiskListener(Client client) { - this.client = client; - } - - /** - * Warn about the given disk usage if the low or high watermark has been passed - */ - private void warnAboutDiskIfNeeded(DiskUsage usage) { - // Check absolute disk values - if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes()) { - logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", - DiskThresholdDecider.this.freeBytesThresholdHigh, usage); - } else if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdLow.bytes()) { - logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", - DiskThresholdDecider.this.freeBytesThresholdLow, usage); - } - - // Check percentage disk values - if (usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) { - logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", - Strings.format1Decimals(100.0 - DiskThresholdDecider.this.freeDiskThresholdHigh, "%"), usage); - } else if (usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdLow) { - logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", - Strings.format1Decimals(100.0 - DiskThresholdDecider.this.freeDiskThresholdLow, "%"), usage); - } - } - - @Override - public void onNewInfo(ClusterInfo info) { - ImmutableOpenMap usages = info.getNodeLeastAvailableDiskUsages(); - if (usages != null) { - boolean reroute = false; - String explanation = ""; - - // Garbage collect nodes that have been removed from the cluster - // from the map that tracks watermark crossing - ObjectLookupContainer nodes = usages.keys(); - for (String node : nodeHasPassedWatermark) { - if (nodes.contains(node) == false) { - nodeHasPassedWatermark.remove(node); - } - } - - for (ObjectObjectCursor entry : usages) { - String node = entry.key; - DiskUsage usage = entry.value; - warnAboutDiskIfNeeded(usage); - if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes() || - usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) { - if ((System.nanoTime() - lastRunNS) > DiskThresholdDecider.this.rerouteInterval.nanos()) { - lastRunNS = System.nanoTime(); - reroute = true; - explanation = "high disk watermark exceeded on one or more nodes"; - } else { - logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + - "in the last [{}], skipping reroute", - node, DiskThresholdDecider.this.rerouteInterval); - } - nodeHasPassedWatermark.add(node); - } else if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdLow.bytes() || - usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdLow) { - nodeHasPassedWatermark.add(node); - } else { - if (nodeHasPassedWatermark.contains(node)) { - // The node has previously been over the high or - // low watermark, but is no longer, so we should - // reroute so any unassigned shards can be allocated - // if they are able to be - if ((System.nanoTime() - lastRunNS) > DiskThresholdDecider.this.rerouteInterval.nanos()) { - lastRunNS = System.nanoTime(); - reroute = true; - explanation = "one or more nodes has gone under the high or low watermark"; - nodeHasPassedWatermark.remove(node); - } else { - logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + - "in the last [{}], skipping reroute", - node, DiskThresholdDecider.this.rerouteInterval); - } - } - } - } - if (reroute) { - logger.info("rerouting shards: [{}]", explanation); - // Execute an empty reroute, but don't block on the response - client.admin().cluster().prepareReroute().execute(); - } - } - } - } - - public DiskThresholdDecider(Settings settings) { - // It's okay the Client is null here, because the empty cluster info - // service will never actually call the listener where the client is - // needed. Also this constructor is only used for tests - this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), EmptyClusterInfoService.INSTANCE, null); - } + private final DiskThresholdSettings diskThresholdSettings; @Inject - public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings, ClusterInfoService infoService, Client client) { + public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) { super(settings); - final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings); - final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings); - setHighWatermark(highWatermark); - setLowWatermark(lowWatermark); - this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings); - this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings); - this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled); - infoService.addListener(new DiskListener(client)); - } - - private void setIncludeRelocations(boolean includeRelocations) { - this.includeRelocations = includeRelocations; - } - - private void setRerouteInterval(TimeValue rerouteInterval) { - this.rerouteInterval = rerouteInterval; - } - - private void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - private void setLowWatermark(String lowWatermark) { - // Watermark is expressed in terms of used data, but we need "free" data watermark - this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark); - this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, - CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); - } - - private void setHighWatermark(String highWatermark) { - // Watermark is expressed in terms of used data, but we need "free" data watermark - this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark); - this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, - CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); - } - - // For Testing - public Double getFreeDiskThresholdLow() { - return freeDiskThresholdLow; - } - - // For Testing - public Double getFreeDiskThresholdHigh() { - return freeDiskThresholdHigh; - } - - // For Testing - public Double getUsedDiskThresholdLow() { - return 100.0 - freeDiskThresholdLow; - } - - // For Testing - public Double getUsedDiskThresholdHigh() { - return 100.0 - freeDiskThresholdHigh; - } - - // For Testing - public ByteSizeValue getFreeBytesThresholdLow() { - return freeBytesThresholdLow; - } - - // For Testing - public ByteSizeValue getFreeBytesThresholdHigh() { - return freeBytesThresholdHigh; - } - - // For Testing - public boolean isIncludeRelocations() { - return includeRelocations; - } - - // For Testing - public boolean isEnabled() { - return enabled; - } - - // For Testing - public TimeValue getRerouteInterval() { - return rerouteInterval; + this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); } /** @@ -306,7 +82,7 @@ public class DiskThresholdDecider extends AllocationDecider { * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size * of all shards */ - public static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, + static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, boolean subtractShardsMovingAway, String dataPath) { ClusterInfo clusterInfo = allocation.clusterInfo(); long totalSize = 0; @@ -333,8 +109,8 @@ public class DiskThresholdDecider extends AllocationDecider { return decision; } - final double usedDiskThresholdLow = 100.0 - DiskThresholdDecider.this.freeDiskThresholdLow; - final double usedDiskThresholdHigh = 100.0 - DiskThresholdDecider.this.freeDiskThresholdHigh; + final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow(); + final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(); DiskUsage usage = getDiskUsage(node, allocation, usages); // First, check that the node currently over the low watermark @@ -351,23 +127,23 @@ public class DiskThresholdDecider extends AllocationDecider { boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData); // checks for exact byte comparisons - if (freeBytes < freeBytesThresholdLow.bytes()) { + if (freeBytes < diskThresholdSettings.getFreeBytesThresholdLow().bytes()) { // If the shard is a replica or has a primary that has already been allocated before, check the low threshold if (!shardRouting.primary() || (shardRouting.primary() && primaryHasBeenAllocated)) { if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, preventing allocation", - freeBytesThresholdLow, freeBytes, node.nodeId()); + diskThresholdSettings.getFreeBytesThresholdLow(), freeBytes, node.nodeId()); } return allocation.decision(Decision.NO, NAME, "the node is above the low watermark and has less than required [%s] free, free: [%s]", - freeBytesThresholdLow, new ByteSizeValue(freeBytes)); - } else if (freeBytes > freeBytesThresholdHigh.bytes()) { + diskThresholdSettings.getFreeBytesThresholdLow(), new ByteSizeValue(freeBytes)); + } else if (freeBytes > diskThresholdSettings.getFreeBytesThresholdHigh().bytes()) { // Allow the shard to be allocated because it is primary that // has never been allocated if it's under the high watermark if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, " + "but allowing allocation because primary has never been allocated", - freeBytesThresholdLow, freeBytes, node.nodeId()); + diskThresholdSettings.getFreeBytesThresholdLow(), freeBytes, node.nodeId()); } return allocation.decision(Decision.YES, NAME, "the node is above the low watermark, but this primary shard has never been allocated before"); @@ -377,17 +153,17 @@ public class DiskThresholdDecider extends AllocationDecider { if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, " + "preventing allocation even though primary has never been allocated", - freeBytesThresholdHigh, freeBytes, node.nodeId()); + diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId()); } return allocation.decision(Decision.NO, NAME, "the node is above the high watermark even though this shard has never been allocated " + "and has less than required [%s] free on node, free: [%s]", - freeBytesThresholdHigh, new ByteSizeValue(freeBytes)); + diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes)); } } // checks for percentage comparisons - if (freeDiskPercentage < freeDiskThresholdLow) { + if (freeDiskPercentage < diskThresholdSettings.getFreeDiskThresholdLow()) { // If the shard is a replica or has a primary that has already been allocated before, check the low threshold if (!shardRouting.primary() || (shardRouting.primary() && primaryHasBeenAllocated)) { if (logger.isDebugEnabled()) { @@ -398,7 +174,7 @@ public class DiskThresholdDecider extends AllocationDecider { return allocation.decision(Decision.NO, NAME, "the node is above the low watermark and has more than allowed [%s%%] used disk, free: [%s%%]", usedDiskThresholdLow, freeDiskPercentage); - } else if (freeDiskPercentage > freeDiskThresholdHigh) { + } else if (freeDiskPercentage > diskThresholdSettings.getFreeDiskThresholdHigh()) { // Allow the shard to be allocated because it is primary that // has never been allocated if it's under the high watermark if (logger.isDebugEnabled()) { @@ -415,7 +191,7 @@ public class DiskThresholdDecider extends AllocationDecider { if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, " + "preventing allocation even though primary has never been allocated", - Strings.format1Decimals(freeDiskThresholdHigh, "%"), + Strings.format1Decimals(diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId()); } return allocation.decision(Decision.NO, NAME, @@ -429,19 +205,20 @@ public class DiskThresholdDecider extends AllocationDecider { final long shardSize = getExpectedShardSize(shardRouting, allocation, 0); double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); long freeBytesAfterShard = freeBytes - shardSize; - if (freeBytesAfterShard < freeBytesThresholdHigh.bytes()) { + if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().bytes()) { logger.warn("after allocating, node [{}] would have less than the required " + "{} free bytes threshold ({} bytes free), preventing allocation", - node.nodeId(), freeBytesThresholdHigh, freeBytesAfterShard); + node.nodeId(), diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytesAfterShard); return allocation.decision(Decision.NO, NAME, "after allocating the shard to this node, it would be above the high watermark " + "and have less than required [%s] free, free: [%s]", - freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard)); + diskThresholdSettings.getFreeBytesThresholdLow(), new ByteSizeValue(freeBytesAfterShard)); } - if (freeSpaceAfterShard < freeDiskThresholdHigh) { + if (freeSpaceAfterShard < diskThresholdSettings.getFreeDiskThresholdHigh()) { logger.warn("after allocating, node [{}] would have more than the allowed " + "{} free disk threshold ({} free), preventing allocation", - node.nodeId(), Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeSpaceAfterShard, "%")); + node.nodeId(), Strings.format1Decimals(diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), + Strings.format1Decimals(freeSpaceAfterShard, "%")); return allocation.decision(Decision.NO, NAME, "after allocating the shard to this node, it would be above the high watermark " + "and have more than allowed [%s%%] used disk, free: [%s%%]", @@ -479,25 +256,25 @@ public class DiskThresholdDecider extends AllocationDecider { return allocation.decision(Decision.YES, NAME, "this shard is not allocated on the most utilized disk and can remain"); } - if (freeBytes < freeBytesThresholdHigh.bytes()) { + if (freeBytes < diskThresholdSettings.getFreeBytesThresholdHigh().bytes()) { if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain", - freeBytesThresholdHigh, freeBytes, node.nodeId()); + diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId()); } return allocation.decision(Decision.NO, NAME, "after allocating this shard this node would be above the high watermark " + "and there would be less than required [%s] free on node, free: [%s]", - freeBytesThresholdHigh, new ByteSizeValue(freeBytes)); + diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes)); } - if (freeDiskPercentage < freeDiskThresholdHigh) { + if (freeDiskPercentage < diskThresholdSettings.getFreeDiskThresholdHigh()) { if (logger.isDebugEnabled()) { logger.debug("less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain", - freeDiskThresholdHigh, freeDiskPercentage, node.nodeId()); + diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage, node.nodeId()); } return allocation.decision(Decision.NO, NAME, "after allocating this shard this node would be above the high watermark " + "and there would be less than required [%s%%] free disk on node, free: [%s%%]", - freeDiskThresholdHigh, freeDiskPercentage); + diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage); } return allocation.decision(Decision.YES, NAME, @@ -516,7 +293,7 @@ public class DiskThresholdDecider extends AllocationDecider { } } - if (includeRelocations) { + if (diskThresholdSettings.includeRelocations()) { long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, true, usage.getPath()); DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); @@ -536,7 +313,7 @@ public class DiskThresholdDecider extends AllocationDecider { * @param usages Map of nodeId to DiskUsage for all known nodes * @return DiskUsage representing given node using the average disk usage */ - public DiskUsage averageUsage(RoutingNode node, ImmutableOpenMap usages) { + DiskUsage averageUsage(RoutingNode node, ImmutableOpenMap usages) { if (usages.size() == 0) { return new DiskUsage(node.nodeId(), node.node().getName(), "_na_", 0, 0); } @@ -556,63 +333,16 @@ public class DiskThresholdDecider extends AllocationDecider { * @param shardSize Size in bytes of the shard * @return Percentage of free space after the shard is assigned to the node */ - public double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) { + double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) { shardSize = (shardSize == null) ? 0 : shardSize; DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(), usage.getTotalBytes(), usage.getFreeBytes() - shardSize); return newUsage.getFreeDiskAsPercentage(); } - /** - * Attempts to parse the watermark into a percentage, returning 100.0% if - * it cannot be parsed. - */ - public double thresholdPercentageFromWatermark(String watermark) { - try { - return RatioValue.parseRatioValue(watermark).getAsPercent(); - } catch (ElasticsearchParseException ex) { - // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two - // cases separately - return 100.0; - } - } - - /** - * Attempts to parse the watermark into a {@link ByteSizeValue}, returning - * a ByteSizeValue of 0 bytes if the value cannot be parsed. - */ - public ByteSizeValue thresholdBytesFromWatermark(String watermark, String settingName) { - try { - return ByteSizeValue.parseBytesSizeValue(watermark, settingName); - } catch (ElasticsearchParseException ex) { - // NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two - // cases separately - return ByteSizeValue.parseBytesSizeValue("0b", settingName); - } - } - - /** - * Checks if a watermark string is a valid percentage or byte size value, - * @return the watermark value given - */ - public static String validWatermarkSetting(String watermark, String settingName) { - try { - RatioValue.parseRatioValue(watermark); - } catch (ElasticsearchParseException e) { - try { - ByteSizeValue.parseBytesSizeValue(watermark, settingName); - } catch (ElasticsearchParseException ex) { - ex.addSuppressed(e); - throw ex; - } - } - return watermark; - - } - private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap usages) { // Always allow allocation if the decider is disabled - if (!enabled) { + if (diskThresholdSettings.isEnabled() == false) { return allocation.decision(Decision.YES, NAME, "the disk threshold decider is disabled"); } @@ -647,7 +377,7 @@ public class DiskThresholdDecider extends AllocationDecider { * Returns the expected shard size for the given shard or the default value provided if not enough information are available * to estimate the shards size. */ - public static final long getExpectedShardSize(ShardRouting shard, RoutingAllocation allocation, long defaultValue) { + public static long getExpectedShardSize(ShardRouting shard, RoutingAllocation allocation, long defaultValue) { final IndexMetaData metaData = allocation.metaData().getIndexSafe(shard.index()); final ClusterInfo info = allocation.clusterInfo(); if (metaData.getMergeSourceIndex() != null && shard.allocatedPostIndexCreate(metaData) == false) { diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 507d0d9c410..3ce483d0fa6 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -32,11 +32,11 @@ import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; -import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; @@ -193,11 +193,11 @@ public final class ClusterSettings extends AbstractScopedSettings { ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING, - DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, - DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, - DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, - DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, - DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED_SETTING, diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index 8299834435b..6a8f8b90681 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -19,6 +19,7 @@ package org.elasticsearch.node; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.node.service.NodeService; @@ -38,5 +39,6 @@ public class NodeModule extends AbstractModule { bind(Node.class).toInstance(node); bind(MonitorService.class).toInstance(monitorService); bind(NodeService.class).asEagerSingleton(); + bind(DiskThresholdMonitor.class).asEagerSingleton(); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java new file mode 100644 index 00000000000..fa4518218c9 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.ESTestCase; + +public class DiskThresholdSettingsTests extends ESTestCase { + + public void testDefaults() { + ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + DiskThresholdSettings diskThresholdSettings = new DiskThresholdSettings(Settings.EMPTY, nss); + + ByteSizeValue zeroBytes = ByteSizeValue.parseBytesSizeValue("0b", "test"); + assertEquals(zeroBytes, diskThresholdSettings.getFreeBytesThresholdHigh()); + assertEquals(10.0D, diskThresholdSettings.getFreeDiskThresholdHigh(), 0.0D); + assertEquals(zeroBytes, diskThresholdSettings.getFreeBytesThresholdLow()); + assertEquals(15.0D, diskThresholdSettings.getFreeDiskThresholdLow(), 0.0D); + assertEquals(60L, diskThresholdSettings.getRerouteInterval().seconds()); + assertTrue(diskThresholdSettings.isEnabled()); + assertTrue(diskThresholdSettings.includeRelocations()); + } + + public void testUpdate() { + ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + DiskThresholdSettings diskThresholdSettings = new DiskThresholdSettings(Settings.EMPTY, nss); + + Settings newSettings = Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), false) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "500mb") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "30s") + .build(); + nss.applySettings(newSettings); + + assertEquals(ByteSizeValue.parseBytesSizeValue("0b", "test"), diskThresholdSettings.getFreeBytesThresholdHigh()); + assertEquals(30.0D, diskThresholdSettings.getFreeDiskThresholdHigh(), 0.0D); + assertEquals(ByteSizeValue.parseBytesSizeValue("500mb", "test"), diskThresholdSettings.getFreeBytesThresholdLow()); + assertEquals(0.0D, diskThresholdSettings.getFreeDiskThresholdLow(), 0.0D); + assertEquals(30L, diskThresholdSettings.getRerouteInterval().seconds()); + assertFalse(diskThresholdSettings.isEnabled()); + assertFalse(diskThresholdSettings.includeRelocations()); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 02a603fed63..e9c82e7e180 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -39,12 +39,14 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.test.ESAllocationTestCase; @@ -67,12 +69,16 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; public class DiskThresholdDeciderTests extends ESAllocationTestCase { + + DiskThresholdDecider makeDecider(Settings settings) { + return new DiskThresholdDecider(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + } public void testDiskThreshold() { Settings diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8).build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8).build(); ImmutableOpenMap.Builder usagesBuilder = ImmutableOpenMap.builder(); usagesBuilder.put("node1", new DiskUsage("node1", "node1", "/dev/null", 100, 10)); // 90% used @@ -90,7 +96,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider(Settings.EMPTY), - new DiskThresholdDecider(diskSettings)))); + makeDecider(diskSettings)))); ClusterInfoService cis = new ClusterInfoService() { @Override @@ -184,14 +190,14 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { // Set the high threshold to 70 instead of 80 // node2 now should not have new shards allocated to it, but shards can remain diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%") - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.7).build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.7).build(); deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider(Settings.EMPTY), - new DiskThresholdDecider(diskSettings)))); + makeDecider(diskSettings)))); strategy = new AllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) @@ -215,14 +221,14 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { // Set the high threshold to 60 instead of 70 // node2 now should not have new shards allocated to it, and shards cannot remain diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.5) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.6).build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.5) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.6).build(); deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider(Settings.EMPTY), - new DiskThresholdDecider(diskSettings)))); + makeDecider(diskSettings)))); strategy = new AllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) @@ -269,9 +275,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { public void testDiskThresholdWithAbsoluteSizes() { Settings diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "30b") - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "9b").build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "30b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "9b").build(); ImmutableOpenMap.Builder usagesBuilder = ImmutableOpenMap.builder(); usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 10)); // 90% used @@ -290,7 +296,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider(Settings.EMPTY), - new DiskThresholdDecider(diskSettings)))); + makeDecider(diskSettings)))); ClusterInfoService cis = new ClusterInfoService() { @Override @@ -423,14 +429,14 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { // Set the high threshold to 70 instead of 80 // node2 now should not have new shards allocated to it, but shards can remain diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "40b") - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "30b").build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "40b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "30b").build(); deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider(Settings.EMPTY), - new DiskThresholdDecider(diskSettings)))); + makeDecider(diskSettings)))); strategy = new AllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) @@ -454,14 +460,14 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { // Set the high threshold to 60 instead of 70 // node2 now should not have new shards allocated to it, and shards cannot remain diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "50b") - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "40b").build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "50b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "40b").build(); deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider(Settings.EMPTY), - new DiskThresholdDecider(diskSettings)))); + makeDecider(diskSettings)))); strategy = new AllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) @@ -542,9 +548,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { public void testDiskThresholdWithShardSizes() { Settings diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "71%").build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "71%").build(); ImmutableOpenMap.Builder usagesBuilder = ImmutableOpenMap.builder(); usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 31)); // 69% used @@ -559,7 +565,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider(Settings.EMPTY), - new DiskThresholdDecider(diskSettings)))); + makeDecider(diskSettings)))); ClusterInfoService cis = new ClusterInfoService() { @Override @@ -611,9 +617,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { public void testUnknownDiskUsage() { Settings diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.85).build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.85).build(); ImmutableOpenMap.Builder usagesBuilder = ImmutableOpenMap.builder(); usagesBuilder.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 50)); // 50% used @@ -629,7 +635,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider(Settings.EMPTY), - new DiskThresholdDecider(diskSettings)))); + makeDecider(diskSettings)))); ClusterInfoService cis = new ClusterInfoService() { @Override @@ -687,7 +693,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { public void testAverageUsage() { RoutingNode rn = new RoutingNode("node1", newNode("node1")); - DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY); + DiskThresholdDecider decider = makeDecider(Settings.EMPTY); ImmutableOpenMap.Builder usages = ImmutableOpenMap.builder(); usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used @@ -700,7 +706,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { public void testFreeDiskPercentageAfterShardAssigned() { RoutingNode rn = new RoutingNode("node1", newNode("node1")); - DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY); + DiskThresholdDecider decider = makeDecider(Settings.EMPTY); Map usages = new HashMap<>(); usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used @@ -712,10 +718,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { public void testShardRelocationsTakenIntoAccount() { Settings diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8).build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), 0.7) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), 0.8).build(); ImmutableOpenMap.Builder usagesBuilder = ImmutableOpenMap.builder(); usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used @@ -734,7 +740,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider(Settings.EMPTY), - new DiskThresholdDecider(diskSettings)))); + makeDecider(diskSettings)))); ClusterInfoService cis = new ClusterInfoService() { @Override @@ -821,10 +827,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { public void testCanRemainWithShardRelocatingAway() { Settings diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%") - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%").build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%").build(); // We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available ImmutableOpenMap.Builder usagesBuilder = ImmutableOpenMap.builder(); @@ -839,7 +845,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes); - DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings); + DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) .build(); @@ -937,10 +943,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { public void testForSingleDataNode() { Settings diskSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%") - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%").build(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), true) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "60%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%").build(); ImmutableOpenMap.Builder usagesBuilder = ImmutableOpenMap.builder(); usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 100)); // 0% used @@ -954,7 +960,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { shardSizes.put("[test][1][p]", 40L); final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes.build()); - DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings); + DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) .build(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 80309004fff..56c7d69c59b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -59,48 +59,10 @@ import static org.hamcrest.CoreMatchers.equalTo; * Unit tests for the DiskThresholdDecider */ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { - public void testDynamicSettings() { - ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - - ClusterInfoService cis = EmptyClusterInfoService.INSTANCE; - DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null); - - assertThat(decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test"))); - assertThat(decider.getFreeDiskThresholdHigh(), equalTo(10.0d)); - assertThat(decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test"))); - assertThat(decider.getFreeDiskThresholdLow(), equalTo(15.0d)); - assertThat(decider.getUsedDiskThresholdLow(), equalTo(85.0d)); - assertThat(decider.getRerouteInterval().seconds(), equalTo(60L)); - assertTrue(decider.isEnabled()); - assertTrue(decider.isIncludeRelocations()); - - Settings newSettings = Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.getKey(), false) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "70%") - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "500mb") - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "30s") - .build(); - - nss.applySettings(newSettings); - assertThat("high threshold bytes should be unset", - decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test"))); - assertThat("high threshold percentage should be changed", - decider.getFreeDiskThresholdHigh(), equalTo(30.0d)); - assertThat("low threshold bytes should be set to 500mb", - decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("500mb", "test"))); - assertThat("low threshold bytes should be unset", - decider.getFreeDiskThresholdLow(), equalTo(0.0d)); - assertThat("reroute interval should be changed to 30 seconds", - decider.getRerouteInterval().seconds(), equalTo(30L)); - assertFalse("disk threshold decider should now be disabled", decider.isEnabled()); - assertFalse("relocations should now be disabled", decider.isIncludeRelocations()); - } public void testCanAllocateUsesMaxAvailableSpace() { ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterInfoService cis = EmptyClusterInfoService.INSTANCE; - DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null); + DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -144,8 +106,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { public void testCanRemainUsesLeastAvailableSpace() { ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterInfoService cis = EmptyClusterInfoService.INSTANCE; - DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null); + DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss); ImmutableOpenMap.Builder shardRoutingMap = ImmutableOpenMap.builder(); DiscoveryNode node_0 = new DiscoveryNode("node_0", LocalTransportAddress.buildUnique(), Collections.emptyMap(), diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 3bd7094796f..68e8fc9c94e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.MockInternalClusterInfoService; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; @@ -74,9 +75,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase { cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50)); client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), randomFrom("20b", "80%")) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), randomFrom("10b", "90%")) - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")).get(); + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), randomFrom("20b", "80%")) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), randomFrom("10b", "90%")) + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")).get(); // Create an index with 10 shards so we can check allocation for it prepareCreate("test").setSettings(Settings.builder() .put("number_of_shards", 10) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index dffa285101f..e723f970bbd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -29,6 +29,7 @@ import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.env.NodeEnvironment; @@ -72,7 +73,6 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -1634,8 +1634,8 @@ public abstract class ESIntegTestCase extends ESTestCase { .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE) // Default the watermarks to absurdly low to prevent the tests // from failing on nodes without enough disk space - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") - .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b") .put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000) .put("script.stored", "true") .put("script.inline", "true") diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index d76fea08911..1dd1c5d9b64 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -43,7 +43,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -316,8 +316,8 @@ public final class InternalTestCluster extends TestCluster { } // Default the watermarks to absurdly low to prevent the tests // from failing on nodes without enough disk space - builder.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b"); - builder.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b"); + builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b"); + builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b"); // Some tests make use of scripting quite a bit, so increase the limit for integration tests builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_PER_MINUTE.getKey(), 1000); if (TEST_NIGHTLY) {