Add option to take currently relocating shards' sizes into account

When using the DiskThresholdDecider, it's possible that shards could
already be marked as relocating to the node being evaluated. This commit
adds a new setting `cluster.routing.allocation.disk.include_relocations`
which adds the size of the shards currently being relocated to this node
to the node's used disk space.

This new option defaults to `true`, however it's possible to
over-estimate the usage for a node if the relocation is already
partially complete, for instance:

A node with a 10gb shard that's 45% of the way through a relocation
would add 10gb + (.45 * 10) = 14.5gb to the node's disk usage before
examining the watermarks to see if a new shard can be allocated.

Fixes #7753
Relates to #6168
This commit is contained in:
Lee Hinman 2014-09-18 14:09:51 +02:00
parent 61c21f9a0e
commit 4185566e93
4 changed files with 166 additions and 2 deletions

View File

@ -137,3 +137,12 @@ Both watermark settings can be changed dynamically using the cluster
settings API. By default, Elasticsearch will retrieve information
about the disk usage of the nodes every 30 seconds. This can also be
changed by setting the `cluster.info.update.interval` setting.
By default, Elasticsearch will take into account shards that are currently being
relocated to the target node when computing a node's disk usage. This can be
changed by setting the `cluster.routing.allocation.disk.include_relocations`
setting to `false` (defaults to `true`). Taking relocating shards' sizes into
account may, however, mean that the disk usage for a node is incorrectly
estimated on the high side, since the relocation could be 90% complete and a
recently retrieved disk usage would include the total size of the relocating
shard as well as the space already used by the running relocation.

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -31,6 +32,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.RatioValue;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.cluster.InternalClusterInfoService.shardIdentifierFromRouting;
@ -66,17 +68,20 @@ public class DiskThresholdDecider extends AllocationDecider {
private volatile Double freeDiskThresholdHigh;
private volatile ByteSizeValue freeBytesThresholdLow;
private volatile ByteSizeValue freeBytesThresholdHigh;
private volatile boolean includeRelocations;
private volatile boolean enabled;
public static final String CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED = "cluster.routing.allocation.disk.threshold_enabled";
public static final String CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.low";
public static final String CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK = "cluster.routing.allocation.disk.watermark.high";
public static final String CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS = "cluster.routing.allocation.disk.include_relocations";
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
String newLowWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, null);
String newHighWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, null);
Boolean newRelocationsSetting = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, null);
Boolean newEnableSetting = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, null);
if (newEnableSetting != null) {
@ -84,6 +89,11 @@ public class DiskThresholdDecider extends AllocationDecider {
DiskThresholdDecider.this.enabled, newEnableSetting);
DiskThresholdDecider.this.enabled = newEnableSetting;
}
if (newRelocationsSetting != null) {
logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS,
DiskThresholdDecider.this.includeRelocations, newRelocationsSetting);
DiskThresholdDecider.this.includeRelocations = newRelocationsSetting;
}
if (newLowWatermark != null) {
if (!validWatermarkSetting(newLowWatermark)) {
throw new ElasticsearchParseException("Unable to parse low watermark: [" + newLowWatermark + "]");
@ -125,11 +135,29 @@ public class DiskThresholdDecider extends AllocationDecider {
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark);
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark);
this.includeRelocations = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true);
this.enabled = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true);
nodeSettingsService.addListener(new ApplySettings());
}
/**
* Returns the size of all shards that are currently being relocated to
* the node, but may not be finished transfering yet.
*/
public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map<String, Long> shardSizes) {
List<ShardRouting> relocatingShards = allocation.routingTable().shardsWithState(ShardRoutingState.RELOCATING);
long totalSize = 0;
for (ShardRouting routing : relocatingShards) {
if (routing.relocatingNodeId().equals(node.nodeId())) {
Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing));
shardSize = shardSize == null ? 0 : shardSize;
totalSize += shardSize;
}
}
return totalSize;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// Always allow allocation if the decider is disabled
@ -175,6 +203,16 @@ public class DiskThresholdDecider extends AllocationDecider {
}
}
if (includeRelocations) {
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isDebugEnabled()) {
logger.debug("usage without relocations: {}", usage);
logger.debug("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
}
usage = usageIncludingRelocations;
}
// First, check that the node currently over the low watermark
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
long freeBytes = usage.getFreeBytes();
@ -226,7 +264,7 @@ public class DiskThresholdDecider extends AllocationDecider {
freeDiskThresholdLow, freeDiskPercentage, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdLow, freeDiskThresholdLow);
freeDiskThresholdLow, freeDiskPercentage);
} else if (freeDiskPercentage > freeDiskThresholdHigh) {
// Allow the shard to be allocated because it is primary that
// has never been allocated if it's under the high watermark
@ -245,7 +283,7 @@ public class DiskThresholdDecider extends AllocationDecider {
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdLow, freeDiskThresholdLow);
freeDiskThresholdLow, freeDiskPercentage);
}
}
@ -306,6 +344,17 @@ public class DiskThresholdDecider extends AllocationDecider {
}
}
if (includeRelocations) {
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isDebugEnabled()) {
logger.debug("usage without relocations: {}", usage);
logger.debug("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
}
usage = usageIncludingRelocations;
}
// If this node is already above the high threshold, the shard cannot remain (get it off!)
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
long freeBytes = usage.getFreeBytes();

View File

@ -84,6 +84,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK);
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK);
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED);
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS);
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME);
clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED);
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing.allocation.decider;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
@ -31,8 +32,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
@ -649,6 +654,106 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
assertThat(after, equalTo(19.0));
}
@Test
public void testShardRelocationsTakenIntoAccount() {
Settings diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.7)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build();
Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", 100, 40)); // 60% used
usages.put("node2", new DiskUsage("node2", 100, 40)); // 60% used
usages.put("node2", new DiskUsage("node3", 100, 40)); // 60% used
Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 14L); // 14 bytes
shardSizes.put("[test][0][r]", 14L);
shardSizes.put("[test2][0][p]", 1L); // 1 bytes
shardSizes.put("[test2][0][r]", 1L);
final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));
AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY,
new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY),
new DiskThresholdDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test2").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.addAsNew(metaData.index("test2"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("--> adding two nodes");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("node1"))
.put(newNode("node2"))
).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// shards should be initializing
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(4));
logger.info("--> start the shards");
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
// Assert that we're able to start the primary and replicas
assertThat(clusterState.routingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4));
logger.info("--> adding node3");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node3"))
).build();
AllocationCommand relocate1 = new MoveAllocationCommand(new ShardId("test", 0), "node2", "node3");
AllocationCommands cmds = new AllocationCommands(relocate1);
routingTable = strategy.reroute(clusterState, cmds).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logShardStates(clusterState);
AllocationCommand relocate2 = new MoveAllocationCommand(new ShardId("test2", 0), "node2", "node3");
cmds = new AllocationCommands(relocate2);
try {
// The shard for the "test" index is already being relocated to
// node3, which will put it over the low watermark when it
// completes, with shard relocations taken into account this should
// throw an exception about not being able to complete
strategy.reroute(clusterState, cmds).routingTable();
fail("should not have been able to reroute the shard");
} catch (ElasticsearchIllegalArgumentException e) {
assertThat("can't allocated because there isn't enough room: " + e.getMessage(),
e.getMessage().contains("less than required [30.0%] free disk on node, free: [26.0%]"), equalTo(true));
}
}
public void logShardStates(ClusterState state) {
RoutingNodes rn = state.routingNodes();
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",