diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index bc6adf03674..4408a03751d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -19,8 +19,6 @@ package org.elasticsearch.cluster.routing.allocation.decider; -import java.util.Set; - import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -44,6 +42,8 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import java.util.Set; + import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; @@ -139,12 +139,25 @@ public class DiskThresholdDecider extends AllocationDecider { // subtractLeavingShards is passed as false here, because they still use disk space, and therefore should we should be extra careful // and take the size into account - DiskUsage usage = getDiskUsage(node, allocation, usages, false); + final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false); // First, check that the node currently over the low watermark double freeDiskPercentage = usage.getFreeDiskAsPercentage(); // Cache the used disk percentage for displaying disk percentages consistent with documentation double usedDiskPercentage = usage.getUsedDiskAsPercentage(); long freeBytes = usage.getFreeBytes(); + if (freeBytes < 0L) { + final long sizeOfRelocatingShards = sizeOfRelocatingShards(node, false, usage.getPath(), + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); + logger.debug("fewer free bytes remaining than the size of all incoming shards: " + + "usage {} on node {} including {} bytes of relocations, preventing allocation", + usage, node.nodeId(), sizeOfRelocatingShards); + + return allocation.decision(Decision.NO, NAME, + "the node has fewer free bytes remaining than the total size of all incoming shards: " + + "free space [%sB], relocating shards [%sB]", + freeBytes + sizeOfRelocatingShards, sizeOfRelocatingShards); + } + ByteSizeValue freeBytesValue = new ByteSizeValue(freeBytes); if (logger.isTraceEnabled()) { logger.trace("node [{}] has {}% used disk", node.nodeId(), usedDiskPercentage); @@ -242,6 +255,7 @@ public class DiskThresholdDecider extends AllocationDecider { // Secondly, check that allocating the shard to this node doesn't put it above the high watermark final long shardSize = getExpectedShardSize(shardRouting, 0L, allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); + assert shardSize >= 0 : shardSize; double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); long freeBytesAfterShard = freeBytes - shardSize; if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { @@ -268,6 +282,7 @@ public class DiskThresholdDecider extends AllocationDecider { diskThresholdSettings.getHighWatermarkRaw(), usedDiskThresholdHigh, freeSpaceAfterShard); } + assert freeBytesAfterShard >= 0 : freeBytesAfterShard; return allocation.decision(Decision.YES, NAME, "enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]", freeBytesValue, @@ -289,7 +304,7 @@ public class DiskThresholdDecider extends AllocationDecider { // subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk // since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check. - final DiskUsage usage = getDiskUsage(node, allocation, usages, true); + final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true); final String dataPath = clusterInfo.getDataPath(shardRouting); // If this node is already above the high threshold, the shard cannot remain (get it off!) final double freeDiskPercentage = usage.getFreeDiskAsPercentage(); @@ -301,6 +316,17 @@ public class DiskThresholdDecider extends AllocationDecider { return allocation.decision(Decision.YES, NAME, "this shard is not allocated on the most utilized disk and can remain"); } + if (freeBytes < 0L) { + final long sizeOfRelocatingShards = sizeOfRelocatingShards(node, false, usage.getPath(), + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); + logger.debug("fewer free bytes remaining than the size of all incoming shards: " + + "usage {} on node {} including {} bytes of relocations, shard cannot remain", + usage, node.nodeId(), sizeOfRelocatingShards); + return allocation.decision(Decision.NO, NAME, + "the shard cannot remain on this node because the node has fewer free bytes remaining than the total size of all " + + "incoming shards: free space [%s], relocating shards [%s]", + freeBytes + sizeOfRelocatingShards, sizeOfRelocatingShards); + } if (freeBytes < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain", @@ -330,8 +356,8 @@ public class DiskThresholdDecider extends AllocationDecider { "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes)); } - private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, - ImmutableOpenMap usages, boolean subtractLeavingShards) { + private DiskUsageWithRelocations getDiskUsage(RoutingNode node, RoutingAllocation allocation, + ImmutableOpenMap usages, boolean subtractLeavingShards) { DiskUsage usage = usages.get(node.nodeId()); if (usage == null) { // If there is no usage, and we have other nodes in the cluster, @@ -343,18 +369,14 @@ public class DiskThresholdDecider extends AllocationDecider { } } - if (diskThresholdSettings.includeRelocations()) { - final long relocatingShardsSize = sizeOfRelocatingShards(node, subtractLeavingShards, usage.getPath(), - allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); - DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(), - usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); - if (logger.isTraceEnabled()) { - logger.trace("usage without relocations: {}", usage); - logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations); - } - usage = usageIncludingRelocations; + final DiskUsageWithRelocations diskUsageWithRelocations = new DiskUsageWithRelocations(usage, + diskThresholdSettings.includeRelocations() ? sizeOfRelocatingShards(node, subtractLeavingShards, usage.getPath(), + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()) : 0); + if (logger.isTraceEnabled()) { + logger.trace("getDiskUsage(subtractLeavingShards={}) returning {}", subtractLeavingShards, diskUsageWithRelocations); } - return usage; + + return diskUsageWithRelocations; } /** @@ -384,7 +406,7 @@ public class DiskThresholdDecider extends AllocationDecider { * @param shardSize Size in bytes of the shard * @return Percentage of free space after the shard is assigned to the node */ - double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) { + double freeDiskPercentageAfterShardAssigned(DiskUsageWithRelocations usage, Long shardSize) { shardSize = (shardSize == null) ? 0 : shardSize; DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(), usage.getTotalBytes(), usage.getFreeBytes() - shardSize); @@ -452,4 +474,59 @@ public class DiskThresholdDecider extends AllocationDecider { return clusterInfo.getShardSize(shard, defaultValue); } } + + static class DiskUsageWithRelocations { + + private final DiskUsage diskUsage; + private final long relocatingShardSize; + + DiskUsageWithRelocations(DiskUsage diskUsage, long relocatingShardSize) { + this.diskUsage = diskUsage; + this.relocatingShardSize = relocatingShardSize; + } + + @Override + public String toString() { + return "DiskUsageWithRelocations{" + + "diskUsage=" + diskUsage + + ", relocatingShardSize=" + relocatingShardSize + + '}'; + } + + double getFreeDiskAsPercentage() { + if (getTotalBytes() == 0L) { + return 100.0; + } + return 100.0 * ((double)getFreeBytes() / getTotalBytes()); + } + + double getUsedDiskAsPercentage() { + return 100.0 - getFreeDiskAsPercentage(); + } + + long getFreeBytes() { + try { + return Math.subtractExact(diskUsage.getFreeBytes(), relocatingShardSize); + } catch (ArithmeticException e) { + return Long.MAX_VALUE; + } + } + + String getPath() { + return diskUsage.getPath(); + } + + String getNodeId() { + return diskUsage.getNodeId(); + } + + String getNodeName() { + return diskUsage.getNodeName(); + } + + long getTotalBytes() { + return diskUsage.getTotalBytes(); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index db0aa271bc7..93d87edad4d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -56,6 +56,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; @@ -63,6 +64,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -628,7 +630,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used - Double after = decider.freeDiskPercentageAfterShardAssigned(new DiskUsage("node2", "n2", "/dev/null", 100, 30), 11L); + Double after = decider.freeDiskPercentageAfterShardAssigned( + new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("node2", "n2", "/dev/null", 100, 30), 0L), 11L); assertThat(after, equalTo(19.0)); } @@ -653,18 +656,19 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes); DiskThresholdDecider decider = makeDecider(diskSettings); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); AllocationDeciders deciders = new AllocationDeciders( - new HashSet<>(Arrays.asList(new SameShardAllocationDecider( - Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) - ), decider))); + new HashSet<>(Arrays.asList( + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), + new EnableAllocationDecider( + Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none").build(), clusterSettings), + decider))); - ClusterInfoService cis = () -> { - logger.info("--> calling fake getClusterInfo"); - return clusterInfo; - }; + final AtomicReference clusterInfoReference = new AtomicReference<>(clusterInfo); + final ClusterInfoService cis = clusterInfoReference::get; AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(), - new BalancedShardsAllocator(Settings.EMPTY), cis); + new BalancedShardsAllocator(Settings.EMPTY), cis); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -702,30 +706,66 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { .add(newNode("node3")) ).build(); - AllocationCommand relocate1 = new MoveAllocationCommand("test", 0, "node2", "node3"); - AllocationCommands cmds = new AllocationCommands(relocate1); + { + AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test", 0, "node2", "node3"); + AllocationCommands cmds = new AllocationCommands(moveAllocationCommand); - clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState(); - logShardStates(clusterState); - - AllocationCommand relocate2 = new MoveAllocationCommand("test2", 0, "node2", "node3"); - cmds = new AllocationCommands(relocate2); - - try { - // The shard for the "test" index is already being relocated to - // node3, which will put it over the low watermark when it - // completes, with shard relocations taken into account this should - // throw an exception about not being able to complete - strategy.reroute(clusterState, cmds, false, false); - fail("should not have been able to reroute the shard"); - } catch (IllegalArgumentException e) { - assertThat("can't be allocated because there isn't enough room: " + e.getMessage(), - e.getMessage(), - containsString("the node is above the low watermark cluster setting " + - "[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " + - "allowed [70.0%], actual free: [26.0%]")); + clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState(); + logShardStates(clusterState); } + final ImmutableOpenMap.Builder overfullUsagesBuilder = ImmutableOpenMap.builder(); + overfullUsagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used + overfullUsagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 40)); // 60% used + overfullUsagesBuilder.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used + final ImmutableOpenMap overfullUsages = overfullUsagesBuilder.build(); + + final ImmutableOpenMap.Builder largerShardSizesBuilder = ImmutableOpenMap.builder(); + largerShardSizesBuilder.put("[test][0][p]", 14L); + largerShardSizesBuilder.put("[test][0][r]", 14L); + largerShardSizesBuilder.put("[test2][0][p]", 2L); + largerShardSizesBuilder.put("[test2][0][r]", 2L); + final ImmutableOpenMap largerShardSizes = largerShardSizesBuilder.build(); + + final ClusterInfo overfullClusterInfo = new DevNullClusterInfo(overfullUsages, overfullUsages, largerShardSizes); + + { + AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3"); + AllocationCommands cmds = new AllocationCommands(moveAllocationCommand); + + final ClusterState clusterStateThatRejectsCommands = clusterState; + + assertThat(expectThrows(IllegalArgumentException.class, + () -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(), + containsString("the node is above the low watermark cluster setting " + + "[cluster.routing.allocation.disk.watermark.low=0.7], using more disk space than the maximum " + + "allowed [70.0%], actual free: [26.0%]")); + + clusterInfoReference.set(overfullClusterInfo); + + assertThat(expectThrows(IllegalArgumentException.class, + () -> strategy.reroute(clusterStateThatRejectsCommands, cmds, false, false)).getMessage(), + containsString("the node has fewer free bytes remaining than the total size of all incoming shards")); + + clusterInfoReference.set(clusterInfo); + } + + { + AllocationCommand moveAllocationCommand = new MoveAllocationCommand("test2", 0, "node2", "node3"); + AllocationCommands cmds = new AllocationCommands(moveAllocationCommand); + + logger.info("--> before starting: {}", clusterState); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + logger.info("--> after starting: {}", clusterState); + clusterState = strategy.reroute(clusterState, cmds, false, false).getClusterState(); + logger.info("--> after running another command: {}", clusterState); + logShardStates(clusterState); + + clusterInfoReference.set(overfullClusterInfo); + + clusterState = strategy.reroute(clusterState, "foo"); + logger.info("--> after another reroute: {}", clusterState); + } } public void testCanRemainWithShardRelocatingAway() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 55f4154680a..fe7bfd9dc80 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -54,6 +54,7 @@ import java.util.HashSet; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; /** * Unit tests for the DiskThresholdDecider @@ -444,4 +445,33 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); } + public void testDiskUsageWithRelocations() { + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), 0).getFreeBytes(), + equalTo(1000L)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), 9).getFreeBytes(), + equalTo(991L)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), -9).getFreeBytes(), + equalTo(1009L)); + + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), 0) + .getFreeDiskAsPercentage(), equalTo(100.0)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 500L), 0) + .getFreeDiskAsPercentage(), equalTo(50.0)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 500L), 100) + .getFreeDiskAsPercentage(), equalTo(40.0)); + + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 1000L), 0) + .getUsedDiskAsPercentage(), equalTo(0.0)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 500L), 0) + .getUsedDiskAsPercentage(), equalTo(50.0)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(new DiskUsage("n", "n", "/dev/null", 1000L, 500L), 100) + .getUsedDiskAsPercentage(), equalTo(60.0)); + + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations( + new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), 0).getFreeBytes(), equalTo(Long.MAX_VALUE)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations( + new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), 10).getFreeBytes(), equalTo(Long.MAX_VALUE - 10)); + assertThat(new DiskThresholdDecider.DiskUsageWithRelocations( + new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), -10).getFreeBytes(), equalTo(Long.MAX_VALUE)); + } }