diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 8b10c4aead5..eeeb6e3389c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -310,7 +310,7 @@ public class AllocationService extends AbstractComponent { } // move shards that no longer can be allocated - changed |= moveShards(allocation); + changed |= shardsAllocators.moveShards(allocation); // rebalance changed |= shardsAllocators.rebalance(allocation); @@ -327,46 +327,6 @@ public class AllocationService extends AbstractComponent { } } - private boolean moveShards(RoutingAllocation allocation) { - boolean changed = false; - - // create a copy of the shards interleaving between nodes, and check if they can remain - List shards = new ArrayList<>(); - int index = 0; - boolean found = true; - final RoutingNodes routingNodes = allocation.routingNodes(); - while (found) { - found = false; - for (RoutingNode routingNode : routingNodes) { - if (index >= routingNode.size()) { - continue; - } - found = true; - shards.add(routingNode.get(index)); - } - index++; - } - for (int i = 0; i < shards.size(); i++) { - ShardRouting shardRouting = shards.get(i); - // we can only move started shards... - if (!shardRouting.started()) { - continue; - } - final RoutingNode routingNode = routingNodes.node(shardRouting.currentNodeId()); - Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if (decision.type() == Decision.Type.NO) { - logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); - boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation); - if (!moved) { - logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); - } else { - changed = true; - } - } - } - return changed; - } - private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) { boolean changed = false; final RoutingNodes routingNodes = allocation.routingNodes(); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index cd75f897719..0c40b26ca67 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.PriorityComparator; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -49,6 +50,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -119,9 +121,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } @Override - public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + public boolean moveShards(RoutingAllocation allocation) { final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); - return balancer.move(shardRouting, node); + return balancer.moveShards(); } /** @@ -489,56 +491,93 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } /** - * This function executes a move operation moving the given shard from - * the given node to the minimal eligible node with respect to the - * weight function. Iff the shard is moved the shard will be set to + * Move started shards that can not be allocated to a node anymore + * + * For each shard to be moved this function executes a move operation + * to the minimal eligible node with respect to the + * weight function. If a shard is moved the shard will be set to * {@link ShardRoutingState#RELOCATING} and a shadow instance of this * shard is created with an incremented version in the state * {@link ShardRoutingState#INITIALIZING}. * - * @return true iff the shard has successfully been moved. + * @return true if the allocation has changed, otherwise false */ - public boolean move(ShardRouting shard, RoutingNode node ) { - if (nodes.isEmpty() || !shard.started()) { - /* with no nodes or a not started shard this is pointless */ + public boolean moveShards() { + if (nodes.isEmpty()) { + /* with no nodes this is pointless */ return false; } - if (logger.isTraceEnabled()) { - logger.trace("Try moving shard [{}] from [{}]", shard, node); - } - final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); - boolean changed = initialize(routingNodes, unassigned); - if (!changed) { - final ModelNode sourceNode = nodes.get(node.nodeId()); - assert sourceNode != null; - final NodeSorter sorter = newNodeSorter(); - sorter.reset(shard.getIndexName()); - final ModelNode[] nodes = sorter.modelNodes; - assert sourceNode.containsShard(shard); - /* - * the sorter holds the minimum weight node first for the shards index. - * We now walk through the nodes until we find a node to allocate the shard. - * This is not guaranteed to be balanced after this operation we still try best effort to - * allocate on the minimal eligible node. - */ - for (ModelNode currentNode : nodes) { - if (currentNode.getNodeId().equals(node.nodeId())) { + // Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling + // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are + // offloading the shards. + List shards = new ArrayList<>(); + int index = 0; + boolean found = true; + while (found) { + found = false; + for (RoutingNode routingNode : routingNodes) { + if (index >= routingNode.size()) { continue; } - RoutingNode target = currentNode.getRoutingNode(routingNodes); - Decision allocationDecision = allocation.deciders().canAllocate(shard, target, allocation); - Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation); - Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); - if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too? - sourceNode.removeShard(shard); - ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); - currentNode.addShard(targetRelocatingShard, decision); - if (logger.isTraceEnabled()) { - logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId()); + found = true; + ShardRouting shardRouting = routingNode.get(index); + // we can only move started shards... + if (shardRouting.started()) { + shards.add(shardRouting); + } + } + index++; + } + if (shards.isEmpty()) { + return false; + } + + final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); + boolean changed = initialize(routingNodes, unassigned); + if (changed == false) { + final NodeSorter sorter = newNodeSorter(); + final ModelNode[] modelNodes = sorter.modelNodes; + for (ShardRouting shardRouting : shards) { + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + assert sourceNode != null && sourceNode.containsShard(shardRouting); + final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes); + Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + if (decision.type() == Decision.Type.NO) { + logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); + sorter.reset(shardRouting.getIndexName()); + /* + * the sorter holds the minimum weight node first for the shards index. + * We now walk through the nodes until we find a node to allocate the shard. + * This is not guaranteed to be balanced after this operation we still try best effort to + * allocate on the minimal eligible node. + */ + boolean moved = false; + for (ModelNode currentNode : modelNodes) { + if (currentNode == sourceNode) { + continue; + } + RoutingNode target = currentNode.getRoutingNode(routingNodes); + Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); + Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation); + if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too? + Decision sourceDecision = sourceNode.removeShard(shardRouting); + ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); + // re-add (now relocating shard) to source node + sourceNode.addShard(shardRouting, sourceDecision); + Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); + currentNode.addShard(targetRelocatingShard, targetDecision); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node()); + } + moved = true; + changed = true; + break; + } + } + if (moved == false) { + logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); } - changed = true; - break; } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java index f49d5002814..4d9c05527d3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; -import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; @@ -36,22 +35,22 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; public interface ShardsAllocator { /** - * Applies changes on started nodes based on the implemented algorithm. For example if a - * shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING} + * Applies changes on started nodes based on the implemented algorithm. For example if a + * shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING} * this allocator might apply some cleanups on the node that used to hold the shard. * @param allocation all started {@link ShardRouting shards} */ void applyStartedShards(StartedRerouteAllocation allocation); /** - * Applies changes on failed nodes based on the implemented algorithm. + * Applies changes on failed nodes based on the implemented algorithm. * @param allocation all failed {@link ShardRouting shards} */ void applyFailedShards(FailedRerouteAllocation allocation); /** - * Assign all unassigned shards to nodes - * + * Assign all unassigned shards to nodes + * * @param allocation current node allocation * @return true if the allocation has changed, otherwise false */ @@ -59,19 +58,17 @@ public interface ShardsAllocator { /** * Rebalancing number of shards on all nodes - * + * * @param allocation current node allocation * @return true if the allocation has changed, otherwise false */ boolean rebalance(RoutingAllocation allocation); /** - * Moves a shard from the given node to other node. - * - * @param shardRouting the shard to move - * @param node A node containing the shard + * Move started shards that can not be allocated to a node anymore + * * @param allocation current node allocation * @return true if the allocation has changed, otherwise false */ - boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation); + boolean moveShards(RoutingAllocation allocation); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java index 65a7bd6971a..f3eb1ebbf14 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java @@ -19,8 +19,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; @@ -96,7 +94,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat } @Override - public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return allocator.move(shardRouting, node, allocation); + public boolean moveShards(RoutingAllocation allocation) { + return allocator.moveShards(allocation); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 2c2bab24605..24635a980a7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -60,7 +60,7 @@ public class ClusterModuleTests extends ModuleTestCase { return false; } @Override - public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + public boolean moveShards(RoutingAllocation allocation) { return false; } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index 68706d96df7..707129578c9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -320,7 +320,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase { } @Override - public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + public boolean moveShards(RoutingAllocation allocation) { return false; }