From 0c71328186d69d35b698eaae485a51cb3151cdc5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 27 Aug 2015 13:11:51 +0200 Subject: [PATCH] Expand ClusterInfo to provide min / max disk usage forn allocation decider Today we sum up the disk usage for the allocation decider which is broken since we don't stripe across multiple data paths. Each shard has it's own private path now but the allocation deciders still treat all paths as one big disk. This commit adds allows allocation deciders to access the least used and most used path to make better allocation decidsions upon canRemain and canAllocate calls. Yet, this commit doesn't fix all the issues since we still can't tell which shard can remain and which can't. This problem is out of scope in this commit and will be solved in a followup commit. Relates to #13106 --- .../elasticsearch/cluster/ClusterInfo.java | 41 ++++-- .../org/elasticsearch/cluster/DiskUsage.java | 10 +- .../cluster/InternalClusterInfoService.java | 83 +++++++----- .../decider/DiskThresholdDecider.java | 28 ++-- .../cluster/ClusterInfoServiceIT.java | 10 +- .../elasticsearch/cluster/DiskUsageTests.java | 67 ++++++++- .../MockInternalClusterInfoService.java | 6 +- .../ExpectedShardSizeAllocationTests.java | 4 +- .../allocation/RebalanceAfterActiveTests.java | 2 +- .../decider/DiskThresholdDeciderTests.java | 64 ++++----- .../DiskThresholdDeciderUnitTests.java | 127 ++++++++++++++++-- .../allocation/decider/MockDiskUsagesIT.java | 22 +-- 12 files changed, 343 insertions(+), 121 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index f21bc9f052e..31f8fa0616c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -32,28 +32,53 @@ import java.util.Map; */ public class ClusterInfo { - private final Map usages; + private final Map leastAvailableSpaceUsage; + private final Map mostAvailabeSpaceUsage; final Map shardSizes; public static final ClusterInfo EMPTY = new ClusterInfo(); - private ClusterInfo() { - this.usages = Collections.emptyMap(); - this.shardSizes = Collections.emptyMap(); + protected ClusterInfo() { + this(Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP); } - public ClusterInfo(Map usages, Map shardSizes) { - this.usages = usages; + /** + * Creates a new ClusterInfo instance. + * + * @param leastAvailableSpaceUsage a node id to disk usage mapping for the path that has the least available space on the node. + * @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node. + * @param shardSizes a shardkey to size in bytes mapping per shard. + * @see #shardIdentifierFromRouting + */ + public ClusterInfo(final Map leastAvailableSpaceUsage, final Map mostAvailableSpaceUsage, final Map shardSizes) { + this.leastAvailableSpaceUsage = leastAvailableSpaceUsage; this.shardSizes = shardSizes; + this.mostAvailabeSpaceUsage = mostAvailableSpaceUsage; } - public Map getNodeDiskUsages() { - return this.usages; + /** + * Returns a node id to disk usage mapping for the path that has the least available space on the node. + */ + public Map getNodeLeastAvailableDiskUsages() { + return this.leastAvailableSpaceUsage; } + /** + * Returns a node id to disk usage mapping for the path that has the most available space on the node. + */ + public Map getNodeMostAvailableDiskUsages() { + return this.mostAvailabeSpaceUsage; + } + + /** + * Returns the shard size for the given shard routing or null it that metric is not available. + */ public Long getShardSize(ShardRouting shardRouting) { return shardSizes.get(shardIdentifierFromRouting(shardRouting)); } + /** + * Returns the shard size for the given shard routing or defaultValue it that metric is not available. + */ public long getShardSize(ShardRouting shardRouting, long defaultValue) { Long shardSize = getShardSize(shardRouting); return shardSize == null ? defaultValue : shardSize; diff --git a/core/src/main/java/org/elasticsearch/cluster/DiskUsage.java b/core/src/main/java/org/elasticsearch/cluster/DiskUsage.java index 92725b08831..e91adae9e34 100644 --- a/core/src/main/java/org/elasticsearch/cluster/DiskUsage.java +++ b/core/src/main/java/org/elasticsearch/cluster/DiskUsage.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; public class DiskUsage { final String nodeId; final String nodeName; + final String path; final long totalBytes; final long freeBytes; @@ -35,11 +36,12 @@ public class DiskUsage { * Create a new DiskUsage, if {@code totalBytes} is 0, {@get getFreeDiskAsPercentage} * will always return 100.0% free */ - public DiskUsage(String nodeId, String nodeName, long totalBytes, long freeBytes) { + public DiskUsage(String nodeId, String nodeName, String path, long totalBytes, long freeBytes) { this.nodeId = nodeId; this.nodeName = nodeName; this.freeBytes = freeBytes; this.totalBytes = totalBytes; + this.path = path; } public String getNodeId() { @@ -50,6 +52,10 @@ public class DiskUsage { return nodeName; } + public String getPath() { + return path; + } + public double getFreeDiskAsPercentage() { // We return 100.0% in order to fail "open", in that if we have invalid // numbers for the total bytes, it's as if we don't know disk usage. @@ -77,7 +83,7 @@ public class DiskUsage { @Override public String toString() { - return "[" + nodeId + "][" + nodeName + "] free: " + new ByteSizeValue(getFreeBytes()) + + return "[" + nodeId + "][" + nodeName + "][" + path + "] free: " + new ByteSizeValue(getFreeBytes()) + "[" + Strings.format1Decimals(getFreeDiskAsPercentage(), "%") + "]"; } } diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 8be9465f7b2..2be8a36e9e5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -35,9 +34,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.node.settings.NodeSettingsService; @@ -47,7 +46,6 @@ import org.elasticsearch.transport.ReceiveTimeoutTransportException; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; /** * InternalClusterInfoService provides the ClusterInfoService interface, @@ -67,7 +65,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu private volatile TimeValue updateFrequency; - private volatile Map usages; + private volatile Map leastAvailableSpaceUsages; + private volatile Map mostAvailableSpaceUsages; private volatile Map shardSizes; private volatile boolean isMaster = false; private volatile boolean enabled; @@ -84,7 +83,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu TransportIndicesStatsAction transportIndicesStatsAction, ClusterService clusterService, ThreadPool threadPool) { super(settings); - this.usages = Collections.emptyMap(); + this.leastAvailableSpaceUsages = Collections.emptyMap(); + this.mostAvailableSpaceUsages = Collections.emptyMap(); this.shardSizes = Collections.emptyMap(); this.transportNodesStatsAction = transportNodesStatsAction; this.transportIndicesStatsAction = transportIndicesStatsAction; @@ -200,9 +200,16 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu if (logger.isTraceEnabled()) { logger.trace("Removing node from cluster info: {}", removedNode.getId()); } - Map newUsages = new HashMap<>(usages); - newUsages.remove(removedNode.getId()); - usages = Collections.unmodifiableMap(newUsages); + if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) { + Map newMaxUsages = new HashMap<>(leastAvailableSpaceUsages); + newMaxUsages.remove(removedNode.getId()); + leastAvailableSpaceUsages = Collections.unmodifiableMap(newMaxUsages); + } + if (mostAvailableSpaceUsages.containsKey(removedNode.getId())) { + Map newMinUsages = new HashMap<>(mostAvailableSpaceUsages); + newMinUsages.remove(removedNode.getId()); + mostAvailableSpaceUsages = Collections.unmodifiableMap(newMinUsages); + } } } } @@ -210,7 +217,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu @Override public ClusterInfo getClusterInfo() { - return new ClusterInfo(usages, shardSizes); + return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes); } @Override @@ -313,27 +320,11 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu CountDownLatch nodeLatch = updateNodeStats(new ActionListener() { @Override public void onResponse(NodesStatsResponse nodeStatses) { - Map newUsages = new HashMap<>(); - for (NodeStats nodeStats : nodeStatses.getNodes()) { - if (nodeStats.getFs() == null) { - logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().name()); - } else { - long available = 0; - long total = 0; - - for (FsInfo.Path info : nodeStats.getFs()) { - available += info.getAvailable().bytes(); - total += info.getTotal().bytes(); - } - String nodeId = nodeStats.getNode().id(); - String nodeName = nodeStats.getNode().getName(); - if (logger.isTraceEnabled()) { - logger.trace("node: [{}], total disk: {}, available disk: {}", nodeId, total, available); - } - newUsages.put(nodeId, new DiskUsage(nodeId, nodeName, total, available)); - } - } - usages = Collections.unmodifiableMap(newUsages); + Map newLeastAvaiableUsages = new HashMap<>(); + Map newMostAvaiableUsages = new HashMap<>(); + fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages); + leastAvailableSpaceUsages = Collections.unmodifiableMap(newLeastAvaiableUsages); + mostAvailableSpaceUsages = Collections.unmodifiableMap(newMostAvaiableUsages); } @Override @@ -349,7 +340,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); } // we empty the usages list, to be safe - we don't know what's going on. - usages = Collections.emptyMap(); + leastAvailableSpaceUsages = Collections.emptyMap(); + mostAvailableSpaceUsages = Collections.emptyMap(); } } }); @@ -412,5 +404,34 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu } } + static void fillDiskUsagePerNode(ESLogger logger, NodeStats[] nodeStatsArray, Map newLeastAvaiableUsages, Map newMostAvaiableUsages) { + for (NodeStats nodeStats : nodeStatsArray) { + if (nodeStats.getFs() == null) { + logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().name()); + } else { + FsInfo.Path leastAvailablePath = null; + FsInfo.Path mostAvailablePath = null; + for (FsInfo.Path info : nodeStats.getFs()) { + if (leastAvailablePath == null) { + assert mostAvailablePath == null; + mostAvailablePath = leastAvailablePath = info; + } else if (leastAvailablePath.getAvailable().bytes() > info.getAvailable().bytes()){ + leastAvailablePath = info; + } else if (mostAvailablePath.getAvailable().bytes() < info.getAvailable().bytes()) { + mostAvailablePath = info; + } + } + String nodeId = nodeStats.getNode().id(); + String nodeName = nodeStats.getNode().getName(); + if (logger.isTraceEnabled()) { + logger.trace("node: [{}], most available: total disk: {}, available disk: {} / least available: total disk: {}, available disk: {}", nodeId, mostAvailablePath.getTotal(), leastAvailablePath.getAvailable(), leastAvailablePath.getTotal(), leastAvailablePath.getAvailable()); + } + newLeastAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, leastAvailablePath.getPath(), leastAvailablePath.getTotal().bytes(), leastAvailablePath.getAvailable().bytes())); + newMostAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, mostAvailablePath.getPath(), mostAvailablePath.getTotal().bytes(), mostAvailablePath.getAvailable().bytes())); + + } + } + } + } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 2a438de800f..7db6294cc93 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -164,7 +164,7 @@ public class DiskThresholdDecider extends AllocationDecider { @Override public void onNewInfo(ClusterInfo info) { - Map usages = info.getNodeDiskUsages(); + Map usages = info.getNodeLeastAvailableDiskUsages(); if (usages != null) { boolean reroute = false; String explanation = ""; @@ -339,7 +339,9 @@ public class DiskThresholdDecider extends AllocationDecider { final double usedDiskThresholdLow = 100.0 - DiskThresholdDecider.this.freeDiskThresholdLow; final double usedDiskThresholdHigh = 100.0 - DiskThresholdDecider.this.freeDiskThresholdHigh; - DiskUsage usage = getDiskUsage(node, allocation); + ClusterInfo clusterInfo = allocation.clusterInfo(); + Map usages = clusterInfo.getNodeMostAvailableDiskUsages(); + DiskUsage usage = getDiskUsage(node, allocation, usages); // First, check that the node currently over the low watermark double freeDiskPercentage = usage.getFreeDiskAsPercentage(); // Cache the used disk percentage for displaying disk percentages consistent with documentation @@ -441,11 +443,16 @@ public class DiskThresholdDecider extends AllocationDecider { @Override public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (shardRouting.currentNodeId().equals(node.nodeId()) == false) { + throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]"); + } final Decision decision = earlyTerminate(allocation); if (decision != null) { return decision; } - DiskUsage usage = getDiskUsage(node, allocation); + ClusterInfo clusterInfo = allocation.clusterInfo(); + Map usages = clusterInfo.getNodeLeastAvailableDiskUsages(); + DiskUsage usage = getDiskUsage(node, allocation, usages); // If this node is already above the high threshold, the shard cannot remain (get it off!) double freeDiskPercentage = usage.getFreeDiskAsPercentage(); long freeBytes = usage.getFreeBytes(); @@ -472,9 +479,8 @@ public class DiskThresholdDecider extends AllocationDecider { return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes)); } - private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation) { + private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, Map usages) { ClusterInfo clusterInfo = allocation.clusterInfo(); - Map usages = clusterInfo.getNodeDiskUsages(); DiskUsage usage = usages.get(node.nodeId()); if (usage == null) { // If there is no usage, and we have other nodes in the cluster, @@ -488,7 +494,7 @@ public class DiskThresholdDecider extends AllocationDecider { if (includeRelocations) { long relocatingShardsSize = sizeOfRelocatingShards(node, clusterInfo, true); - DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(), + DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(), "_na_", usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); if (logger.isTraceEnabled()) { logger.trace("usage without relocations: {}", usage); @@ -508,7 +514,7 @@ public class DiskThresholdDecider extends AllocationDecider { */ public DiskUsage averageUsage(RoutingNode node, Map usages) { if (usages.size() == 0) { - return new DiskUsage(node.nodeId(), node.node().name(), 0, 0); + return new DiskUsage(node.nodeId(), node.node().name(), "_na_", 0, 0); } long totalBytes = 0; long freeBytes = 0; @@ -516,7 +522,7 @@ public class DiskThresholdDecider extends AllocationDecider { totalBytes += du.getTotalBytes(); freeBytes += du.getFreeBytes(); } - return new DiskUsage(node.nodeId(), node.node().name(), totalBytes / usages.size(), freeBytes / usages.size()); + return new DiskUsage(node.nodeId(), node.node().name(), "_na_", totalBytes / usages.size(), freeBytes / usages.size()); } /** @@ -528,8 +534,8 @@ public class DiskThresholdDecider extends AllocationDecider { */ public double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) { shardSize = (shardSize == null) ? 0 : shardSize; - DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(), - usage.getTotalBytes(), usage.getFreeBytes() - shardSize); + DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(), + usage.getTotalBytes(), usage.getFreeBytes() - shardSize); return newUsage.getFreeDiskAsPercentage(); } @@ -600,7 +606,7 @@ public class DiskThresholdDecider extends AllocationDecider { return allocation.decision(Decision.YES, NAME, "cluster info unavailable"); } - final Map usages = clusterInfo.getNodeDiskUsages(); + final Map usages = clusterInfo.getNodeLeastAvailableDiskUsages(); // Fail open if there are no disk usages available if (usages.isEmpty()) { if (logger.isTraceEnabled()) { diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 6368e5aa1d9..d76f99793f7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -164,7 +164,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { infoService.addListener(listener); ClusterInfo info = listener.get(); assertNotNull("info should not be null", info); - Map usages = info.getNodeDiskUsages(); + Map usages = info.getNodeLeastAvailableDiskUsages(); Map shardSizes = info.shardSizes; assertNotNull(usages); assertNotNull(shardSizes); @@ -197,7 +197,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { infoService.updateOnce(); ClusterInfo info = listener.get(); assertNotNull("failed to collect info", info); - assertThat("some usages are populated", info.getNodeDiskUsages().size(), Matchers.equalTo(2)); + assertThat("some usages are populated", info.getNodeLeastAvailableDiskUsages().size(), Matchers.equalTo(2)); assertThat("some shard sizes are populated", info.shardSizes.size(), greaterThan(0)); @@ -231,7 +231,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { // node info will time out both on the request level on the count down latch. this means // it is likely to update the node disk usage based on the one response that came be from local // node. - assertThat(info.getNodeDiskUsages().size(), greaterThanOrEqualTo(1)); + assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThanOrEqualTo(1)); // indices is guaranteed to time out on the latch, not updating anything. assertThat(info.shardSizes.size(), greaterThan(1)); @@ -252,7 +252,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { infoService.updateOnce(); info = listener.get(); assertNotNull("info should not be null", info); - assertThat(info.getNodeDiskUsages().size(), equalTo(0)); + assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(0)); assertThat(info.shardSizes.size(), equalTo(0)); // check we recover @@ -261,7 +261,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { infoService.updateOnce(); info = listener.get(); assertNotNull("info should not be null", info); - assertThat(info.getNodeDiskUsages().size(), equalTo(2)); + assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(2)); assertThat(info.shardSizes.size(), greaterThan(0)); } diff --git a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index df9c1883dd8..04ae502e654 100644 --- a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -19,16 +19,25 @@ package org.elasticsearch.cluster; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.test.ESTestCase; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + import static org.hamcrest.Matchers.equalTo; public class DiskUsageTests extends ESTestCase { @Test public void diskUsageCalcTest() { - DiskUsage du = new DiskUsage("node1", "n1", 100, 40); + DiskUsage du = new DiskUsage("node1", "n1", "random", 100, 40); assertThat(du.getFreeDiskAsPercentage(), equalTo(40.0)); assertThat(du.getUsedDiskAsPercentage(), equalTo(100.0 - 40.0)); assertThat(du.getFreeBytes(), equalTo(40L)); @@ -37,19 +46,19 @@ public class DiskUsageTests extends ESTestCase { // Test that DiskUsage handles invalid numbers, as reported by some // filesystems (ZFS & NTFS) - DiskUsage du2 = new DiskUsage("node1", "n1", 100, 101); + DiskUsage du2 = new DiskUsage("node1", "n1","random", 100, 101); assertThat(du2.getFreeDiskAsPercentage(), equalTo(101.0)); assertThat(du2.getFreeBytes(), equalTo(101L)); assertThat(du2.getUsedBytes(), equalTo(-1L)); assertThat(du2.getTotalBytes(), equalTo(100L)); - DiskUsage du3 = new DiskUsage("node1", "n1", -1, -1); + DiskUsage du3 = new DiskUsage("node1", "n1", "random",-1, -1); assertThat(du3.getFreeDiskAsPercentage(), equalTo(100.0)); assertThat(du3.getFreeBytes(), equalTo(-1L)); assertThat(du3.getUsedBytes(), equalTo(0L)); assertThat(du3.getTotalBytes(), equalTo(-1L)); - DiskUsage du4 = new DiskUsage("node1", "n1", 0, 0); + DiskUsage du4 = new DiskUsage("node1", "n1","random", 0, 0); assertThat(du4.getFreeDiskAsPercentage(), equalTo(100.0)); assertThat(du4.getFreeBytes(), equalTo(0L)); assertThat(du4.getUsedBytes(), equalTo(0L)); @@ -62,7 +71,7 @@ public class DiskUsageTests extends ESTestCase { for (int i = 1; i < iters; i++) { long total = between(Integer.MIN_VALUE, Integer.MAX_VALUE); long free = between(Integer.MIN_VALUE, Integer.MAX_VALUE); - DiskUsage du = new DiskUsage("random", "random", total, free); + DiskUsage du = new DiskUsage("random", "random", "random", total, free); if (total == 0) { assertThat(du.getFreeBytes(), equalTo(free)); assertThat(du.getTotalBytes(), equalTo(0L)); @@ -78,4 +87,52 @@ public class DiskUsageTests extends ESTestCase { } } } + + public void testFillDiskUsage() { + Map newLeastAvaiableUsages = new HashMap<>(); + Map newMostAvaiableUsages = new HashMap<>(); + FsInfo.Path[] node1FSInfo = new FsInfo.Path[] { + new FsInfo.Path("/middle", "/dev/sda", 100, 90, 80), + new FsInfo.Path("/least", "/dev/sdb", 200, 190, 70), + new FsInfo.Path("/most", "/dev/sdc", 300, 290, 280), + }; + FsInfo.Path[] node2FSInfo = new FsInfo.Path[] { + new FsInfo.Path("/least_most", "/dev/sda", 100, 90, 80), + }; + + FsInfo.Path[] node3FSInfo = new FsInfo.Path[] { + new FsInfo.Path("/least", "/dev/sda", 100, 90, 70), + new FsInfo.Path("/most", "/dev/sda", 100, 90, 80), + }; + NodeStats[] nodeStats = new NodeStats[] { + new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT), 0, + null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null), + new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, Version.CURRENT), 0, + null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null), + new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, Version.CURRENT), 0, + null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null) + }; + InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages); + DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1"); + DiskUsage mostNode_1 = newMostAvaiableUsages.get("node_1"); + assertDiskUsage(mostNode_1, node1FSInfo[2]); + assertDiskUsage(leastNode_1, node1FSInfo[1]); + + DiskUsage leastNode_2 = newLeastAvaiableUsages.get("node_2"); + DiskUsage mostNode_2 = newMostAvaiableUsages.get("node_2"); + assertDiskUsage(leastNode_2, node2FSInfo[0]); + assertDiskUsage(mostNode_2, node2FSInfo[0]); + + DiskUsage leastNode_3 = newLeastAvaiableUsages.get("node_3"); + DiskUsage mostNode_3 = newMostAvaiableUsages.get("node_3"); + assertDiskUsage(leastNode_3, node3FSInfo[0]); + assertDiskUsage(mostNode_3, node3FSInfo[1]); + } + + private void assertDiskUsage(DiskUsage usage, FsInfo.Path path) { + assertEquals(usage.toString(), usage.getPath(), path.getPath()); + assertEquals(usage.toString(), usage.getTotalBytes(), path.getTotal().bytes()); + assertEquals(usage.toString(), usage.getFreeBytes(), path.getAvailable().bytes()); + + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/core/src/test/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 33ae26e6ebe..80ced595922 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/core/src/test/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -63,9 +63,9 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService { ClusterService clusterService, ThreadPool threadPool) { super(settings, nodeSettingsService, transportNodesStatsAction, transportIndicesStatsAction, clusterService, threadPool); this.clusterName = ClusterName.clusterNameFromSettings(settings); - stats[0] = MockDiskUsagesIT.makeStats("node_t1", new DiskUsage("node_t1", "n1", 100, 100)); - stats[1] = MockDiskUsagesIT.makeStats("node_t2", new DiskUsage("node_t2", "n2", 100, 100)); - stats[2] = MockDiskUsagesIT.makeStats("node_t3", new DiskUsage("node_t3", "n3", 100, 100)); + stats[0] = MockDiskUsagesIT.makeStats("node_t1", new DiskUsage("node_t1", "n1", "_na_", 100, 100)); + stats[1] = MockDiskUsagesIT.makeStats("node_t2", new DiskUsage("node_t2", "n2", "_na_", 100, 100)); + stats[2] = MockDiskUsagesIT.makeStats("node_t3", new DiskUsage("node_t3", "n3", "_na_", 100, 100)); } public void setN1Usage(String nodeName, DiskUsage newUsage) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java index e512fcdfbd3..e6a0ec4bc97 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java @@ -59,7 +59,7 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase { AllocationService strategy = createAllocationService(Settings.EMPTY, new ClusterInfoService() { @Override public ClusterInfo getClusterInfo() { - return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) { + return new ClusterInfo() { @Override public Long getShardSize(ShardRouting shardRouting) { if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) { @@ -118,7 +118,7 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase { final AllocationService allocation = createAllocationService(Settings.EMPTY, new ClusterInfoService() { @Override public ClusterInfo getClusterInfo() { - return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) { + return new ClusterInfo() { @Override public Long getShardSize(ShardRouting shardRouting) { if (shardRouting.index().equals("test") && shardRouting.shardId().getId() == 0) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index 56510384246..4dd88501ec2 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -66,7 +66,7 @@ public class RebalanceAfterActiveTests extends ESAllocationTestCase { new ClusterInfoService() { @Override public ClusterInfo getClusterInfo() { - return new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP) { + return new ClusterInfo() { @Override public Long getShardSize(ShardRouting shardRouting) { if (shardRouting.index().equals("test")) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 41e995c367d..c202f46f688 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.gateway.NoopGatewayAllocator; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; import java.util.*; @@ -65,15 +66,15 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build(); Map usages = new HashMap<>(); - usages.put("node1", new DiskUsage("node1", "node1", 100, 10)); // 90% used - usages.put("node2", new DiskUsage("node2", "node2", 100, 35)); // 65% used - usages.put("node3", new DiskUsage("node3", "node3", 100, 60)); // 40% used - usages.put("node4", new DiskUsage("node4", "node4", 100, 80)); // 20% used + usages.put("node1", new DiskUsage("node1", "node1", "_na_", 100, 10)); // 90% used + usages.put("node2", new DiskUsage("node2", "node2", "_na_", 100, 35)); // 65% used + usages.put("node3", new DiskUsage("node3", "node3", "_na_", 100, 60)); // 40% used + usages.put("node4", new DiskUsage("node4", "node4", "_na_", 100, 80)); // 20% used Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][r]", 10L); - final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -92,7 +93,6 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { // noop } }; - AllocationService strategy = new AllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") @@ -259,16 +259,16 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "9b").build(); Map usages = new HashMap<>(); - usages.put("node1", new DiskUsage("node1", "n1", 100, 10)); // 90% used - usages.put("node2", new DiskUsage("node2", "n2", 100, 10)); // 90% used - usages.put("node3", new DiskUsage("node3", "n3", 100, 60)); // 40% used - usages.put("node4", new DiskUsage("node4", "n4", 100, 80)); // 20% used - usages.put("node5", new DiskUsage("node5", "n5", 100, 85)); // 15% used + usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 10)); // 90% used + usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 10)); // 90% used + usages.put("node3", new DiskUsage("node3", "n3", "_na_", 100, 60)); // 40% used + usages.put("node4", new DiskUsage("node4", "n4", "_na_", 100, 80)); // 20% used + usages.put("node5", new DiskUsage("node5", "n5", "_na_", 100, 85)); // 15% used Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][r]", 10L); - final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -329,8 +329,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { logger.info("--> nodeWithoutPrimary: {}", nodeWithoutPrimary); // Make node without the primary now habitable to replicas - usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", 100, 35)); // 65% used - final ClusterInfo clusterInfo2 = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); + usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "_na_", 100, 35)); // 65% used + final ClusterInfo clusterInfo2 = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); cis = new ClusterInfoService() { @Override public ClusterInfo getClusterInfo() { @@ -524,12 +524,12 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "71%").build(); Map usages = new HashMap<>(); - usages.put("node1", new DiskUsage("node1", "n1", 100, 31)); // 69% used - usages.put("node2", new DiskUsage("node2", "n2", 100, 1)); // 99% used + usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 31)); // 69% used + usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 1)); // 99% used Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes - final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -590,13 +590,13 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.85).build(); Map usages = new HashMap<>(); - usages.put("node2", new DiskUsage("node2", "node2", 100, 50)); // 50% used - usages.put("node3", new DiskUsage("node3", "node3", 100, 0)); // 100% used + usages.put("node2", new DiskUsage("node2", "node2", "_na_", 100, 50)); // 50% used + usages.put("node3", new DiskUsage("node3", "node3", "_na_", 100, 0)); // 100% used Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes shardSizes.put("[test][0][r]", 10L); // 10 bytes - final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -661,8 +661,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY); Map usages = new HashMap<>(); - usages.put("node2", new DiskUsage("node2", "n2", 100, 50)); // 50% used - usages.put("node3", new DiskUsage("node3", "n3", 100, 0)); // 100% used + usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 50)); // 50% used + usages.put("node3", new DiskUsage("node3", "n3", "_na_", 100, 0)); // 100% used DiskUsage node1Usage = decider.averageUsage(rn, usages); assertThat(node1Usage.getTotalBytes(), equalTo(100L)); @@ -675,10 +675,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY); Map usages = new HashMap<>(); - usages.put("node2", new DiskUsage("node2", "n2", 100, 50)); // 50% used - usages.put("node3", new DiskUsage("node3", "n3", 100, 0)); // 100% used + usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 50)); // 50% used + usages.put("node3", new DiskUsage("node3", "n3", "_na_", 100, 0)); // 100% used - Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", "n2", 100, 30), 11L); + Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", "n2", "_na_", 100, 30), 11L); assertThat(after, equalTo(19.0)); } @@ -691,16 +691,16 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build(); Map usages = new HashMap<>(); - usages.put("node1", new DiskUsage("node1", "n1", 100, 40)); // 60% used - usages.put("node2", new DiskUsage("node2", "n2", 100, 40)); // 60% used - usages.put("node2", new DiskUsage("node3", "n3", 100, 40)); // 60% used + usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 40)); // 60% used + usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 40)); // 60% used + usages.put("node2", new DiskUsage("node3", "n3", "_na_", 100, 40)); // 60% used Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 14L); // 14 bytes shardSizes.put("[test][0][r]", 14L); shardSizes.put("[test2][0][p]", 1L); // 1 bytes shardSizes.put("[test2][0][r]", 1L); - final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( @@ -797,13 +797,13 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { // We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available Map usages = new HashMap<>(); - usages.put("node1", new DiskUsage("node1", "n1", 100, 20)); // 80% used - usages.put("node2", new DiskUsage("node2", "n2", 100, 100)); // 0% used + usages.put("node1", new DiskUsage("node1", "n1", "_na_", 100, 20)); // 80% used + usages.put("node2", new DiskUsage("node2", "n2", "_na_", 100, 100)); // 0% used Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 40L); shardSizes.put("[test][1][p]", 40L); - final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes)); DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings); MetaData metaData = MetaData.builder() diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 0be13948e42..9d3b4a60e34 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -20,15 +20,15 @@ package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterInfoService; -import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingHelper; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.node.settings.NodeSettingsService; @@ -76,11 +76,11 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { applySettings.onRefreshSettings(newSettings); assertThat("high threshold bytes should be unset", - decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test"))); + decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test"))); assertThat("high threshold percentage should be changed", decider.getFreeDiskThresholdHigh(), equalTo(30.0d)); assertThat("low threshold bytes should be set to 500mb", - decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("500mb", "test"))); + decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("500mb", "test"))); assertThat("low threshold bytes should be unset", decider.getFreeDiskThresholdLow(), equalTo(0.0d)); assertThat("reroute interval should be changed to 30 seconds", @@ -89,13 +89,120 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { assertFalse("relocations should now be disabled", decider.isIncludeRelocations()); } + public void testCanAllocateUsesMaxAvailableSpace() { + NodeSettingsService nss = new NodeSettingsService(Settings.EMPTY); + ClusterInfoService cis = EmptyClusterInfoService.INSTANCE; + DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null); + + ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + DiscoveryNode node_0 = new DiscoveryNode("node_0", DummyTransportAddress.INSTANCE, Version.CURRENT); + DiscoveryNode node_1 = new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(node_0) + .put(node_1) + ).build(); + + // actual test -- after all that bloat :) + Map leastAvailableUsages = new HashMap<>(); + leastAvailableUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 0)); // all full + leastAvailableUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 0)); // all full + + Map mostAvailableUsage = new HashMap<>(); + mostAvailableUsage.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, randomIntBetween(20, 100))); // 20 - 99 percent since after allocation there must be at least 10% left and shard is 10byte + mostAvailableUsage.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, randomIntBetween(0, 10))); // this is weird and smells like a bug! it should be up to 20%? + + Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 10L); // 10 bytes + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes)); + RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo); + assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation)); + assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation)); + } + + + public void testCanRemainUsesLeastAvailableSpace() { + NodeSettingsService nss = new NodeSettingsService(Settings.EMPTY); + ClusterInfoService cis = EmptyClusterInfoService.INSTANCE; + DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null); + + + DiscoveryNode node_0 = new DiscoveryNode("node_0", DummyTransportAddress.INSTANCE, Version.CURRENT); + DiscoveryNode node_1 = new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT); + + ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + ShardRoutingHelper.initialize(test_0, node_0.getId()); + ShardRoutingHelper.moveToStarted(test_0); + + + ShardRouting test_1 = ShardRouting.newUnassigned("test", 1, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); + ShardRoutingHelper.initialize(test_1, node_1.getId()); + ShardRoutingHelper.moveToStarted(test_1); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + logger.info("--> adding two nodes"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(node_0) + .put(node_1) + ).build(); + + // actual test -- after all that bloat :) + Map leastAvailableUsages = new HashMap<>(); + leastAvailableUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 10)); // 90% used + leastAvailableUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 9)); // 91% used + + Map mostAvailableUsage = new HashMap<>(); + mostAvailableUsage.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 90)); // 10% used + mostAvailableUsage.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 90)); // 10% used + + Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 10L); // 10 bytes + shardSizes.put("[test][1][p]", 10L); + final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes)); + RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo); + assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation)); + assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation)); + try { + decider.canRemain(test_0, new RoutingNode("node_1", node_1), allocation); + fail("not allocated on this node"); + } catch (IllegalArgumentException ex) { + // not allocated on that node + } + try { + decider.canRemain(test_1, new RoutingNode("node_0", node_0), allocation); + fail("not allocated on this node"); + } catch (IllegalArgumentException ex) { + // not allocated on that node + } + } + + public void testShardSizeAndRelocatingSize() { Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][r]", 10L); shardSizes.put("[test][1][r]", 100L); shardSizes.put("[test][2][r]", 1000L); shardSizes.put("[other][0][p]", 10000L); - ClusterInfo info = new ClusterInfo(Collections.EMPTY_MAP, shardSizes); + ClusterInfo info = new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP, shardSizes); ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); ShardRoutingHelper.initialize(test_0, "node1"); ShardRoutingHelper.moveToStarted(test_0); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index e5a14d29a73..66949b530d9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -76,9 +76,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase { // Start with all nodes at 50% usage final MockInternalClusterInfoService cis = (MockInternalClusterInfoService) internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()); - cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", 100, 50)); - cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", 100, 50)); - cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", 100, 50)); + cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50)); + cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50)); + cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 50)); client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder() .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, randomFrom("20b", "80%")) @@ -97,8 +97,8 @@ public class MockDiskUsagesIT extends ESIntegTestCase { @Override public void run() { ClusterInfo info = cis.getClusterInfo(); - logger.info("--> got: {} nodes", info.getNodeDiskUsages().size()); - assertThat(info.getNodeDiskUsages().size(), greaterThan(0)); + logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size()); + assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0)); } }); @@ -113,9 +113,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase { } // Update the disk usages so one node has now passed the high watermark - cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", 100, 50)); - cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", 100, 50)); - cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", 100, 0)); // nothing free on node3 + cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50)); + cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50)); + cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 0)); // nothing free on node3 // Retrieve the count of shards on each node final Map nodesToShardCount = newHashMap(); @@ -138,9 +138,9 @@ public class MockDiskUsagesIT extends ESIntegTestCase { }); // Update the disk usages so one node is now back under the high watermark - cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", 100, 50)); - cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", 100, 50)); - cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", 100, 50)); // node3 has free space now + cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 50)); + cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 50)); + cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 50)); // node3 has free space now // Retrieve the count of shards on each node nodesToShardCount.clear();