From 099b1a70d5c34da2201290f66899cd968cdb849f Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 25 Nov 2014 18:05:37 +0100 Subject: [PATCH] Core: Let the disk threshold decider take into account shards moving away from a node in order to determine if a shard can remain. By taking this into account we can prevent that we move too many shards away than is necessary. Closes #8538 Closes #8659 --- .../decider/DiskThresholdDecider.java | 20 ++- .../decider/DiskThresholdDeciderTests.java | 115 ++++++++++++++++++ 2 files changed, 129 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 83154562cd5..3aa1fa313b4 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -223,20 +223,28 @@ public class DiskThresholdDecider extends AllocationDecider { /** * Returns the size of all shards that are currently being relocated to * the node, but may not be finished transfering yet. + * + * If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size + * of all shards */ - public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map shardSizes) { + public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map shardSizes, boolean subtractShardsMovingAway) { List relocatingShards = allocation.routingTable().shardsWithState(ShardRoutingState.RELOCATING); long totalSize = 0; for (ShardRouting routing : relocatingShards) { if (routing.relocatingNodeId().equals(node.nodeId())) { - Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing)); - shardSize = shardSize == null ? 0 : shardSize; - totalSize += shardSize; + totalSize += getShardSize(routing, shardSizes); + } else if (subtractShardsMovingAway && routing.currentNodeId().equals(node.nodeId())) { + totalSize -= getShardSize(routing, shardSizes); } } return totalSize; } + private long getShardSize(ShardRouting routing, Map shardSizes) { + Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing)); + return shardSize == null ? 0 : shardSize; + } + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { // Always allow allocation if the decider is disabled @@ -283,7 +291,7 @@ public class DiskThresholdDecider extends AllocationDecider { } if (includeRelocations) { - long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes); + long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes, false); DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); if (logger.isTraceEnabled()) { @@ -429,7 +437,7 @@ public class DiskThresholdDecider extends AllocationDecider { if (includeRelocations) { Map shardSizes = clusterInfo.getShardSizes(); - long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes); + long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes, true); DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); if (logger.isTraceEnabled()) { diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index a02e187d38e..086667acd85 100644 --- a/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -29,9 +29,11 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; @@ -39,6 +41,7 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.gateway.NoopGatewayAllocator; +import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchAllocationTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -52,6 +55,8 @@ import java.util.Map; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase { @@ -790,6 +795,116 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase { } + @Test + public void testCanRemainWithShardRelocatingAway() { + Settings diskSettings = settingsBuilder() + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true) + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true) + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "60%") + .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "70%").build(); + + // We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available + Map usages = new HashMap<>(); + usages.put("node1", new DiskUsage("node1", "n1", 100, 20)); // 80% used + usages.put("node2", new DiskUsage("node2", "n2", 100, 100)); // 0% used + + Map shardSizes = new HashMap<>(); + shardSizes.put("[test][0][p]", 40L); + shardSizes.put("[test][1][p]", 40L); + final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes)); + + DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + DiscoveryNode discoveryNode1 = new DiscoveryNode("node1", new LocalTransportAddress("1"), Version.CURRENT); + DiscoveryNode discoveryNode2 = new DiscoveryNode("node2", new LocalTransportAddress("2"), Version.CURRENT); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(discoveryNode1).put(discoveryNode2).build(); + + ClusterState baseClusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + + // Two shards consuming each 80% of disk space while 70% is allowed, so shard 0 isn't allowed here + MutableShardRouting firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1); + MutableShardRouting secondRouting = new MutableShardRouting("test", 1, "node1", true, ShardRoutingState.STARTED, 1); + RoutingNode firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting)); + RoutingTable.Builder builder = RoutingTable.builder().add( + IndexRoutingTable.builder("test") + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false) + .addShard(firstRouting) + .build() + ) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false) + .addShard(secondRouting) + .build() + ) + ); + ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build(); + RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); + assertThat(decision.type(), equalTo(Decision.Type.NO)); + + // Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay + firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1); + secondRouting = new MutableShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING, 1); + firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting)); + builder = RoutingTable.builder().add( + IndexRoutingTable.builder("test") + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false) + .addShard(firstRouting) + .build() + ) + .addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false) + .addShard(secondRouting) + .build() + ) + ); + clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build(); + routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + + // Creating AllocationService instance and the services it depends on... + ClusterInfoService cis = new ClusterInfoService() { + @Override + public ClusterInfo getClusterInfo() { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; + } + + @Override + public void addListener(Listener listener) { + // noop + } + }; + AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY, new HashSet<>(Arrays.asList( + new SameShardAllocationDecider(ImmutableSettings.EMPTY), diskThresholdDecider + ))); + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .build(), deciders, makeShardsAllocators(), cis); + // Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away + // and therefor we will have sufficient disk space on node1. + RoutingAllocation.Result result = strategy.reroute(clusterState); + assertThat(result.changed(), is(false)); + assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED)); + assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().relocatingNodeId(), nullValue()); + assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().state(), equalTo(RELOCATING)); + assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), equalTo("node2")); + } + public void logShardStates(ClusterState state) { RoutingNodes rn = state.routingNodes(); logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",