Core: Let the disk threshold decider take into account shards moving away from a node in order to determine if a shard can remain.
By taking this into account we can prevent that we move too many shards away than is necessary. Closes #8538 Closes #8659
This commit is contained in:
parent
716212c037
commit
099b1a70d5
|
@ -223,20 +223,28 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
/**
|
/**
|
||||||
* Returns the size of all shards that are currently being relocated to
|
* Returns the size of all shards that are currently being relocated to
|
||||||
* the node, but may not be finished transfering yet.
|
* the node, but may not be finished transfering yet.
|
||||||
|
*
|
||||||
|
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
|
||||||
|
* of all shards
|
||||||
*/
|
*/
|
||||||
public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map<String, Long> shardSizes) {
|
public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map<String, Long> shardSizes, boolean subtractShardsMovingAway) {
|
||||||
List<ShardRouting> relocatingShards = allocation.routingTable().shardsWithState(ShardRoutingState.RELOCATING);
|
List<ShardRouting> relocatingShards = allocation.routingTable().shardsWithState(ShardRoutingState.RELOCATING);
|
||||||
long totalSize = 0;
|
long totalSize = 0;
|
||||||
for (ShardRouting routing : relocatingShards) {
|
for (ShardRouting routing : relocatingShards) {
|
||||||
if (routing.relocatingNodeId().equals(node.nodeId())) {
|
if (routing.relocatingNodeId().equals(node.nodeId())) {
|
||||||
Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing));
|
totalSize += getShardSize(routing, shardSizes);
|
||||||
shardSize = shardSize == null ? 0 : shardSize;
|
} else if (subtractShardsMovingAway && routing.currentNodeId().equals(node.nodeId())) {
|
||||||
totalSize += shardSize;
|
totalSize -= getShardSize(routing, shardSizes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return totalSize;
|
return totalSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getShardSize(ShardRouting routing, Map<String, Long> shardSizes) {
|
||||||
|
Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing));
|
||||||
|
return shardSize == null ? 0 : shardSize;
|
||||||
|
}
|
||||||
|
|
||||||
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||||
|
|
||||||
// Always allow allocation if the decider is disabled
|
// Always allow allocation if the decider is disabled
|
||||||
|
@ -283,7 +291,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (includeRelocations) {
|
if (includeRelocations) {
|
||||||
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
|
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes, false);
|
||||||
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
|
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
|
||||||
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
|
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -429,7 +437,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
||||||
|
|
||||||
if (includeRelocations) {
|
if (includeRelocations) {
|
||||||
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
|
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
|
||||||
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
|
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes, true);
|
||||||
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
|
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
|
||||||
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
|
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
|
|
@ -29,9 +29,11 @@ import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.DiskUsage;
|
import org.elasticsearch.cluster.DiskUsage;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.routing.*;
|
import org.elasticsearch.cluster.routing.*;
|
||||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||||
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
|
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
||||||
|
@ -39,6 +41,7 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
|
||||||
|
import org.elasticsearch.common.transport.LocalTransportAddress;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
|
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
|
@ -52,6 +55,8 @@ import java.util.Map;
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
|
||||||
public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
||||||
|
|
||||||
|
@ -790,6 +795,116 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCanRemainWithShardRelocatingAway() {
|
||||||
|
Settings diskSettings = settingsBuilder()
|
||||||
|
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
|
||||||
|
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true)
|
||||||
|
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "60%")
|
||||||
|
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "70%").build();
|
||||||
|
|
||||||
|
// We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available
|
||||||
|
Map<String, DiskUsage> usages = new HashMap<>();
|
||||||
|
usages.put("node1", new DiskUsage("node1", "n1", 100, 20)); // 80% used
|
||||||
|
usages.put("node2", new DiskUsage("node2", "n2", 100, 100)); // 0% used
|
||||||
|
|
||||||
|
Map<String, Long> shardSizes = new HashMap<>();
|
||||||
|
shardSizes.put("[test][0][p]", 40L);
|
||||||
|
shardSizes.put("[test][1][p]", 40L);
|
||||||
|
final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));
|
||||||
|
|
||||||
|
DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings);
|
||||||
|
MetaData metaData = MetaData.builder()
|
||||||
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
RoutingTable routingTable = RoutingTable.builder()
|
||||||
|
.addAsNew(metaData.index("test"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
DiscoveryNode discoveryNode1 = new DiscoveryNode("node1", new LocalTransportAddress("1"), Version.CURRENT);
|
||||||
|
DiscoveryNode discoveryNode2 = new DiscoveryNode("node2", new LocalTransportAddress("2"), Version.CURRENT);
|
||||||
|
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(discoveryNode1).put(discoveryNode2).build();
|
||||||
|
|
||||||
|
ClusterState baseClusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
|
||||||
|
.metaData(metaData)
|
||||||
|
.routingTable(routingTable)
|
||||||
|
.nodes(discoveryNodes)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Two shards consuming each 80% of disk space while 70% is allowed, so shard 0 isn't allowed here
|
||||||
|
MutableShardRouting firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1);
|
||||||
|
MutableShardRouting secondRouting = new MutableShardRouting("test", 1, "node1", true, ShardRoutingState.STARTED, 1);
|
||||||
|
RoutingNode firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
|
||||||
|
RoutingTable.Builder builder = RoutingTable.builder().add(
|
||||||
|
IndexRoutingTable.builder("test")
|
||||||
|
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false)
|
||||||
|
.addShard(firstRouting)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false)
|
||||||
|
.addShard(secondRouting)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build();
|
||||||
|
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
|
||||||
|
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||||
|
assertThat(decision.type(), equalTo(Decision.Type.NO));
|
||||||
|
|
||||||
|
// Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
|
||||||
|
firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1);
|
||||||
|
secondRouting = new MutableShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING, 1);
|
||||||
|
firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
|
||||||
|
builder = RoutingTable.builder().add(
|
||||||
|
IndexRoutingTable.builder("test")
|
||||||
|
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false)
|
||||||
|
.addShard(firstRouting)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false)
|
||||||
|
.addShard(secondRouting)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build();
|
||||||
|
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
|
||||||
|
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
|
||||||
|
assertThat(decision.type(), equalTo(Decision.Type.YES));
|
||||||
|
|
||||||
|
// Creating AllocationService instance and the services it depends on...
|
||||||
|
ClusterInfoService cis = new ClusterInfoService() {
|
||||||
|
@Override
|
||||||
|
public ClusterInfo getClusterInfo() {
|
||||||
|
logger.info("--> calling fake getClusterInfo");
|
||||||
|
return clusterInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addListener(Listener listener) {
|
||||||
|
// noop
|
||||||
|
}
|
||||||
|
};
|
||||||
|
AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY, new HashSet<>(Arrays.asList(
|
||||||
|
new SameShardAllocationDecider(ImmutableSettings.EMPTY), diskThresholdDecider
|
||||||
|
)));
|
||||||
|
AllocationService strategy = new AllocationService(settingsBuilder()
|
||||||
|
.put("cluster.routing.allocation.concurrent_recoveries", 10)
|
||||||
|
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
|
||||||
|
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
|
||||||
|
.build(), deciders, makeShardsAllocators(), cis);
|
||||||
|
// Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away
|
||||||
|
// and therefor we will have sufficient disk space on node1.
|
||||||
|
RoutingAllocation.Result result = strategy.reroute(clusterState);
|
||||||
|
assertThat(result.changed(), is(false));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().currentNodeId(), equalTo("node1"));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().relocatingNodeId(), nullValue());
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().state(), equalTo(RELOCATING));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().currentNodeId(), equalTo("node1"));
|
||||||
|
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), equalTo("node2"));
|
||||||
|
}
|
||||||
|
|
||||||
public void logShardStates(ClusterState state) {
|
public void logShardStates(ClusterState state) {
|
||||||
RoutingNodes rn = state.routingNodes();
|
RoutingNodes rn = state.routingNodes();
|
||||||
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",
|
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",
|
||||||
|
|
Loading…
Reference in New Issue