Change DiskThresholdDecider's behavior when factoring in leaving shards
This changes DiskThresholdDecider to only factor in leaving shards when checking if a shard can remain. Previously, leaving shards were factored in for both the `canAllocate` and `canRemain` checks, however, this makes only the leaving shard sizes subtracted in the `canRemain` check. It was possible that multiple shards relocating away from the node would have their entire size subtracted, and the node had a chance to go over the disk threshold (or hit the disk full) because it subtracted space that was still being used for other in-progress relocations.
This commit is contained in:
parent
fd3392aef8
commit
28d3c4488e
|
@ -78,11 +78,10 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
* Returns the size of all shards that are currently being relocated to
|
* Returns the size of all shards that are currently being relocated to
|
||||||
* the node, but may not be finished transferring yet.
|
* the node, but may not be finished transferring yet.
|
||||||
*
|
*
|
||||||
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
|
* If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards
|
||||||
* of all shards
|
|
||||||
*/
|
*/
|
||||||
static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
|
static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
|
||||||
boolean subtractShardsMovingAway, String dataPath) {
|
boolean subtractShardsMovingAway, String dataPath) {
|
||||||
ClusterInfo clusterInfo = allocation.clusterInfo();
|
ClusterInfo clusterInfo = allocation.clusterInfo();
|
||||||
long totalSize = 0;
|
long totalSize = 0;
|
||||||
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
|
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
|
||||||
|
@ -111,7 +110,9 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow();
|
final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow();
|
||||||
final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh();
|
final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh();
|
||||||
|
|
||||||
DiskUsage usage = getDiskUsage(node, allocation, usages);
|
// subtractLeavingShards is passed as false here, because they still use disk space, and therefore should we should be extra careful
|
||||||
|
// and take the size into account
|
||||||
|
DiskUsage usage = getDiskUsage(node, allocation, usages, false);
|
||||||
// First, check that the node currently over the low watermark
|
// First, check that the node currently over the low watermark
|
||||||
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
|
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
|
||||||
// Cache the used disk percentage for displaying disk percentages consistent with documentation
|
// Cache the used disk percentage for displaying disk percentages consistent with documentation
|
||||||
|
@ -243,7 +244,9 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
return decision;
|
return decision;
|
||||||
}
|
}
|
||||||
|
|
||||||
final DiskUsage usage = getDiskUsage(node, allocation, usages);
|
// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
|
||||||
|
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
|
||||||
|
final DiskUsage usage = getDiskUsage(node, allocation, usages, true);
|
||||||
final String dataPath = clusterInfo.getDataPath(shardRouting);
|
final String dataPath = clusterInfo.getDataPath(shardRouting);
|
||||||
// If this node is already above the high threshold, the shard cannot remain (get it off!)
|
// If this node is already above the high threshold, the shard cannot remain (get it off!)
|
||||||
final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
|
final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
|
||||||
|
@ -280,7 +283,8 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
"there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes));
|
"there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes));
|
||||||
}
|
}
|
||||||
|
|
||||||
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
|
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
|
||||||
|
ImmutableOpenMap<String, DiskUsage> usages, boolean subtractLeavingShards) {
|
||||||
DiskUsage usage = usages.get(node.nodeId());
|
DiskUsage usage = usages.get(node.nodeId());
|
||||||
if (usage == null) {
|
if (usage == null) {
|
||||||
// If there is no usage, and we have other nodes in the cluster,
|
// If there is no usage, and we have other nodes in the cluster,
|
||||||
|
@ -293,7 +297,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (diskThresholdSettings.includeRelocations()) {
|
if (diskThresholdSettings.includeRelocations()) {
|
||||||
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, true, usage.getPath());
|
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath());
|
||||||
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
|
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
|
||||||
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
|
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
|
|
@ -56,6 +56,7 @@ import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.singleton;
|
import static java.util.Collections.singleton;
|
||||||
|
@ -729,10 +730,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
|
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
|
||||||
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
|
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
|
||||||
|
|
||||||
|
DiskThresholdDecider decider = makeDecider(diskSettings);
|
||||||
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
|
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
|
||||||
new HashSet<>(Arrays.asList(
|
new HashSet<>(Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY), decider)));
|
||||||
new SameShardAllocationDecider(Settings.EMPTY),
|
|
||||||
makeDecider(diskSettings))));
|
|
||||||
|
|
||||||
ClusterInfoService cis = new ClusterInfoService() {
|
ClusterInfoService cis = new ClusterInfoService() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -832,6 +832,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
|
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
|
||||||
shardSizesBuilder.put("[test][0][p]", 40L);
|
shardSizesBuilder.put("[test][0][p]", 40L);
|
||||||
shardSizesBuilder.put("[test][1][p]", 40L);
|
shardSizesBuilder.put("[test][1][p]", 40L);
|
||||||
|
shardSizesBuilder.put("[foo][0][p]", 10L);
|
||||||
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
|
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
|
||||||
|
|
||||||
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
|
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
|
||||||
|
@ -839,10 +840,12 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings);
|
DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings);
|
||||||
MetaData metaData = MetaData.builder()
|
MetaData metaData = MetaData.builder()
|
||||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
|
||||||
|
.put(IndexMetaData.builder("foo").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
RoutingTable initialRoutingTable = RoutingTable.builder()
|
RoutingTable initialRoutingTable = RoutingTable.builder()
|
||||||
.addAsNew(metaData.index("test"))
|
.addAsNew(metaData.index("test"))
|
||||||
|
.addAsNew(metaData.index("foo"))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
DiscoveryNode discoveryNode1 = new DiscoveryNode("node1", new LocalTransportAddress("1"), emptyMap(),
|
DiscoveryNode discoveryNode1 = new DiscoveryNode("node1", new LocalTransportAddress("1"), emptyMap(),
|
||||||
|
@ -881,6 +884,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
// Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
|
// Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
|
||||||
firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED);
|
firstRouting = TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED);
|
||||||
secondRouting = TestShardRouting.newShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING);
|
secondRouting = TestShardRouting.newShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING);
|
||||||
|
ShardRouting fooRouting = TestShardRouting.newShardRouting("foo", 0, "node1", null, true, ShardRoutingState.UNASSIGNED);
|
||||||
firstRoutingNode = new RoutingNode("node1", discoveryNode1, firstRouting, secondRouting);
|
firstRoutingNode = new RoutingNode("node1", discoveryNode1, firstRouting, secondRouting);
|
||||||
builder = RoutingTable.builder().add(
|
builder = RoutingTable.builder().add(
|
||||||
IndexRoutingTable.builder(firstRouting.index())
|
IndexRoutingTable.builder(firstRouting.index())
|
||||||
|
@ -898,6 +902,8 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
|
||||||
false);
|
false);
|
||||||
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||||
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||||
|
decision = diskThresholdDecider.canAllocate(fooRouting, firstRoutingNode, routingAllocation);
|
||||||
|
assertThat(decision.type(), equalTo(Decision.Type.NO));
|
||||||
|
|
||||||
// Creating AllocationService instance and the services it depends on...
|
// Creating AllocationService instance and the services it depends on...
|
||||||
ClusterInfoService cis = new ClusterInfoService() {
|
ClusterInfoService cis = new ClusterInfoService() {
|
||||||
|
|
Loading…
Reference in New Issue