Rebalancing policy shouldn't prevent hard allocation decisions (#17698)

#14259 added a check to honor rebalancing policies (i.e., rebalance only on green state) when moving shards due to changes in allocation filtering rules. The rebalancing policy is there to make sure that we don't try to even out the number of shards per node when we are still missing shards. However, it should not interfere with explicit user commands (allocation filtering) or things like the disk threshold wanting to move shards because of a node hitting the high water mark.

#14259 was done to address #14057 where people reported that using allocation filtering caused many shards to be moved at once. This is however a none issue - with 1.7 (where the issue was reported) and 2.x, we protect recovery source nodes by limitting the number of concurrent data streams they can open (i.e., we can have many recoveries, but they will be throttled). In 5.0 we came up with a simpler and more understandable approach where we have a hard limit on the number of outgoing recoveries per node (on top of the incoming recoveries we already had).
This commit is contained in:
Boaz Leskes 2016-04-13 20:44:41 +02:00
parent e601e02912
commit a0d1f8889d
3 changed files with 7 additions and 7 deletions

View File

@ -556,9 +556,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
for (ModelNode currentNode : sorter.modelNodes) { for (ModelNode currentNode : sorter.modelNodes) {
if (currentNode != sourceNode) { if (currentNode != sourceNode) {
RoutingNode target = currentNode.getRoutingNode(); RoutingNode target = currentNode.getRoutingNode();
// don't use canRebalance as we want hard filtering rules to apply. See #17698
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation); if (allocationDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
sourceNode.removeShard(shardRouting); sourceNode.removeShard(shardRouting);
ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
currentNode.addShard(targetRelocatingShard); currentNode.addShard(targetRelocatingShard);

View File

@ -62,12 +62,12 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
Property.Dynamic, Property.NodeScope); Property.Dynamic, Property.NodeScope);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING = public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING =
new Setting<>("cluster.routing.allocation.node_concurrent_incoming_recoveries", new Setting<>("cluster.routing.allocation.node_concurrent_incoming_recoveries",
(s) -> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getRaw(s), CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING::getRaw,
(s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_incoming_recoveries"), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_incoming_recoveries"),
Property.Dynamic, Property.NodeScope); Property.Dynamic, Property.NodeScope);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING = public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING =
new Setting<>("cluster.routing.allocation.node_concurrent_outgoing_recoveries", new Setting<>("cluster.routing.allocation.node_concurrent_outgoing_recoveries",
(s) -> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getRaw(s), CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING::getRaw,
(s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_outgoing_recoveries"), (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_outgoing_recoveries"),
Property.Dynamic, Property.NodeScope); Property.Dynamic, Property.NodeScope);

View File

@ -165,7 +165,7 @@ public class FilterRoutingTests extends ESAllocationTestCase {
} }
} }
public void testRebalanceAfterShardsCannotRemainOnNode() { public void testConcurrentRecoveriesAfterShardsCannotRemainOnNode() {
AllocationService strategy = createAllocationService(Settings.builder().build()); AllocationService strategy = createAllocationService(Settings.builder().build());
logger.info("Building initial routing table"); logger.info("Building initial routing table");
@ -199,14 +199,14 @@ public class FilterRoutingTests extends ESAllocationTestCase {
logger.info("--> disable allocation for node1 and reroute"); logger.info("--> disable allocation for node1 and reroute");
strategy = createAllocationService(Settings.builder() strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.cluster_concurrent_rebalance", "1") .put("cluster.routing.allocation.node_concurrent_recoveries", "1")
.put("cluster.routing.allocation.exclude.tag1", "value1") .put("cluster.routing.allocation.exclude.tag1", "value1")
.build()); .build());
logger.info("--> move shards from node1 to node2"); logger.info("--> move shards from node1 to node2");
routingTable = strategy.reroute(clusterState, "reroute").routingTable(); routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("--> check that concurrent rebalance only allows 1 shard to move"); logger.info("--> check that concurrent recoveries only allows 1 shard to move");
assertThat(clusterState.getRoutingNodes().node(node1.getId()).numberOfShardsWithState(STARTED), equalTo(1)); assertThat(clusterState.getRoutingNodes().node(node1.getId()).numberOfShardsWithState(STARTED), equalTo(1));
assertThat(clusterState.getRoutingNodes().node(node2.getId()).numberOfShardsWithState(INITIALIZING), equalTo(1)); assertThat(clusterState.getRoutingNodes().node(node2.getId()).numberOfShardsWithState(INITIALIZING), equalTo(1));
assertThat(clusterState.getRoutingNodes().node(node2.getId()).numberOfShardsWithState(STARTED), equalTo(2)); assertThat(clusterState.getRoutingNodes().node(node2.getId()).numberOfShardsWithState(STARTED), equalTo(2));