From 28d3c4488ec087de2a69491cc977c9d632d06cb0 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 30 Aug 2016 13:48:23 -0600 Subject: [PATCH] Change DiskThresholdDecider's behavior when factoring in leaving shards This changes DiskThresholdDecider to only factor in leaving shards when checking if a shard can remain. Previously, leaving shards were factored in for both the `canAllocate` and `canRemain` checks, however, this makes only the leaving shard sizes subtracted in the `canRemain` check. It was possible that multiple shards relocating away from the node would have their entire size subtracted, and the node had a chance to go over the disk threshold (or hit the disk full) because it subtracted space that was still being used for other in-progress relocations. --- .../decider/DiskThresholdDecider.java | 18 +++++++++++------- .../decider/DiskThresholdDeciderTests.java | 12 +++++++++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 30de708b1e1..b64b74cc9cb 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -78,11 +78,10 @@ public class DiskThresholdDecider extends AllocationDecider { * Returns the size of all shards that are currently being relocated to * the node, but may not be finished transferring yet. * - * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size - * of all shards + * If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards */ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, - boolean subtractShardsMovingAway, String dataPath) { + boolean subtractShardsMovingAway, String dataPath) { ClusterInfo clusterInfo = allocation.clusterInfo(); long totalSize = 0; for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) { @@ -111,7 +110,9 @@ public class DiskThresholdDecider extends AllocationDecider { final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow(); final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(); - DiskUsage usage = getDiskUsage(node, allocation, usages); + // subtractLeavingShards is passed as false here, because they still use disk space, and therefore should we should be extra careful + // and take the size into account + DiskUsage usage = getDiskUsage(node, allocation, usages, false); // First, check that the node currently over the low watermark double freeDiskPercentage = usage.getFreeDiskAsPercentage(); // Cache the used disk percentage for displaying disk percentages consistent with documentation @@ -243,7 +244,9 @@ public class DiskThresholdDecider extends AllocationDecider { return decision; } - final DiskUsage usage = getDiskUsage(node, allocation, usages); + // subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk + // since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check. + final DiskUsage usage = getDiskUsage(node, allocation, usages, true); final String dataPath = clusterInfo.getDataPath(shardRouting); // If this node is already above the high threshold, the shard cannot remain (get it off!) final double freeDiskPercentage = usage.getFreeDiskAsPercentage(); @@ -280,7 +283,8 @@ public class DiskThresholdDecider extends AllocationDecider { "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes)); } - private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap usages) { + private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, + ImmutableOpenMap usages, boolean subtractLeavingShards) { DiskUsage usage = usages.get(node.nodeId()); if (usage == null) { // If there is no usage, and we have other nodes in the cluster, @@ -293,7 +297,7 @@ public class DiskThresholdDecider extends AllocationDecider { } if (diskThresholdSettings.includeRelocations()) { - long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, true, usage.getPath()); + long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath()); DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); if (logger.isTraceEnabled()) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 75c33c44cf1..98268e409fc 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -56,6 +56,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; @@ -729,10 +730,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ImmutableOpenMap shardSizes = shardSizesBuilder.build(); final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes); + DiskThresholdDecider decider = makeDecider(diskSettings); AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, - new HashSet<>(Arrays.asList( - new SameShardAllocationDecider(Settings.EMPTY), - makeDecider(diskSettings)))); + new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), decider))); ClusterInfoService cis = new ClusterInfoService() { @Override @@ -832,6 +832,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ImmutableOpenMap.Builder shardSizesBuilder = ImmutableOpenMap.builder(); shardSizesBuilder.put("[test][0][p]", 40L); shardSizesBuilder.put("[test][1][p]", 40L); + shardSizesBuilder.put("[foo][0][p]", 10L); ImmutableOpenMap shardSizes = shardSizesBuilder.build(); final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes); @@ -839,10 +840,12 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) + .put(IndexMetaData.builder("foo").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) .build(); RoutingTable initialRoutingTable = RoutingTable.builder() .addAsNew(metaData.index("test")) + .addAsNew(metaData.index("foo")) .build(); DiscoveryNode discoveryNode1 = new DiscoveryNode("node1", new LocalTransportAddress("1"), emptyMap(), @@ -881,6 +884,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { // Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED); secondRouting = TestShardRouting.newShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING); + ShardRouting fooRouting = TestShardRouting.newShardRouting("foo", 0, "node1", null, true, ShardRoutingState.UNASSIGNED); firstRoutingNode = new RoutingNode("node1", discoveryNode1, firstRouting, secondRouting); builder = RoutingTable.builder().add( IndexRoutingTable.builder(firstRouting.index()) @@ -898,6 +902,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { false); decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); assertThat(decision.type(), equalTo(Decision.Type.YES)); + decision = diskThresholdDecider.canAllocate(fooRouting, firstRoutingNode, routingAllocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); // Creating AllocationService instance and the services it depends on... ClusterInfoService cis = new ClusterInfoService() {