Merge pull request #16926 from ywelsch/fix/balancer-move
Speed up shard balancer by reusing shard model while moving shards that can no longer be allocated to a node
This commit is contained in:
commit
675d940f01
|
@ -310,7 +310,7 @@ public class AllocationService extends AbstractComponent {
|
|||
}
|
||||
|
||||
// move shards that no longer can be allocated
|
||||
changed |= moveShards(allocation);
|
||||
changed |= shardsAllocators.moveShards(allocation);
|
||||
|
||||
// rebalance
|
||||
changed |= shardsAllocators.rebalance(allocation);
|
||||
|
@ -327,46 +327,6 @@ public class AllocationService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean moveShards(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
|
||||
// create a copy of the shards interleaving between nodes, and check if they can remain
|
||||
List<ShardRouting> shards = new ArrayList<>();
|
||||
int index = 0;
|
||||
boolean found = true;
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
while (found) {
|
||||
found = false;
|
||||
for (RoutingNode routingNode : routingNodes) {
|
||||
if (index >= routingNode.size()) {
|
||||
continue;
|
||||
}
|
||||
found = true;
|
||||
shards.add(routingNode.get(index));
|
||||
}
|
||||
index++;
|
||||
}
|
||||
for (int i = 0; i < shards.size(); i++) {
|
||||
ShardRouting shardRouting = shards.get(i);
|
||||
// we can only move started shards...
|
||||
if (!shardRouting.started()) {
|
||||
continue;
|
||||
}
|
||||
final RoutingNode routingNode = routingNodes.node(shardRouting.currentNodeId());
|
||||
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
|
||||
if (decision.type() == Decision.Type.NO) {
|
||||
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
|
||||
boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation);
|
||||
if (!moved) {
|
||||
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
|
||||
} else {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
|
||||
boolean changed = false;
|
||||
final RoutingNodes routingNodes = allocation.routingNodes();
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.gateway.PriorityComparator;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -49,6 +50,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
|
@ -119,9 +121,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
public boolean moveShards(RoutingAllocation allocation) {
|
||||
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
|
||||
return balancer.move(shardRouting, node);
|
||||
return balancer.moveShards();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -489,56 +491,93 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
}
|
||||
|
||||
/**
|
||||
* This function executes a move operation moving the given shard from
|
||||
* the given node to the minimal eligible node with respect to the
|
||||
* weight function. Iff the shard is moved the shard will be set to
|
||||
* Move started shards that can not be allocated to a node anymore
|
||||
*
|
||||
* For each shard to be moved this function executes a move operation
|
||||
* to the minimal eligible node with respect to the
|
||||
* weight function. If a shard is moved the shard will be set to
|
||||
* {@link ShardRoutingState#RELOCATING} and a shadow instance of this
|
||||
* shard is created with an incremented version in the state
|
||||
* {@link ShardRoutingState#INITIALIZING}.
|
||||
*
|
||||
* @return <code>true</code> iff the shard has successfully been moved.
|
||||
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
||||
*/
|
||||
public boolean move(ShardRouting shard, RoutingNode node ) {
|
||||
if (nodes.isEmpty() || !shard.started()) {
|
||||
/* with no nodes or a not started shard this is pointless */
|
||||
public boolean moveShards() {
|
||||
if (nodes.isEmpty()) {
|
||||
/* with no nodes this is pointless */
|
||||
return false;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Try moving shard [{}] from [{}]", shard, node);
|
||||
}
|
||||
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
|
||||
boolean changed = initialize(routingNodes, unassigned);
|
||||
if (!changed) {
|
||||
final ModelNode sourceNode = nodes.get(node.nodeId());
|
||||
assert sourceNode != null;
|
||||
final NodeSorter sorter = newNodeSorter();
|
||||
sorter.reset(shard.getIndexName());
|
||||
final ModelNode[] nodes = sorter.modelNodes;
|
||||
assert sourceNode.containsShard(shard);
|
||||
/*
|
||||
* the sorter holds the minimum weight node first for the shards index.
|
||||
* We now walk through the nodes until we find a node to allocate the shard.
|
||||
* This is not guaranteed to be balanced after this operation we still try best effort to
|
||||
* allocate on the minimal eligible node.
|
||||
*/
|
||||
|
||||
for (ModelNode currentNode : nodes) {
|
||||
if (currentNode.getNodeId().equals(node.nodeId())) {
|
||||
// Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
|
||||
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
|
||||
// offloading the shards.
|
||||
List<ShardRouting> shards = new ArrayList<>();
|
||||
int index = 0;
|
||||
boolean found = true;
|
||||
while (found) {
|
||||
found = false;
|
||||
for (RoutingNode routingNode : routingNodes) {
|
||||
if (index >= routingNode.size()) {
|
||||
continue;
|
||||
}
|
||||
RoutingNode target = currentNode.getRoutingNode(routingNodes);
|
||||
Decision allocationDecision = allocation.deciders().canAllocate(shard, target, allocation);
|
||||
Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation);
|
||||
Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
|
||||
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
||||
sourceNode.removeShard(shard);
|
||||
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
currentNode.addShard(targetRelocatingShard, decision);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
|
||||
found = true;
|
||||
ShardRouting shardRouting = routingNode.get(index);
|
||||
// we can only move started shards...
|
||||
if (shardRouting.started()) {
|
||||
shards.add(shardRouting);
|
||||
}
|
||||
}
|
||||
index++;
|
||||
}
|
||||
if (shards.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
|
||||
boolean changed = initialize(routingNodes, unassigned);
|
||||
if (changed == false) {
|
||||
final NodeSorter sorter = newNodeSorter();
|
||||
final ModelNode[] modelNodes = sorter.modelNodes;
|
||||
for (ShardRouting shardRouting : shards) {
|
||||
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
|
||||
assert sourceNode != null && sourceNode.containsShard(shardRouting);
|
||||
final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes);
|
||||
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
|
||||
if (decision.type() == Decision.Type.NO) {
|
||||
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
|
||||
sorter.reset(shardRouting.getIndexName());
|
||||
/*
|
||||
* the sorter holds the minimum weight node first for the shards index.
|
||||
* We now walk through the nodes until we find a node to allocate the shard.
|
||||
* This is not guaranteed to be balanced after this operation we still try best effort to
|
||||
* allocate on the minimal eligible node.
|
||||
*/
|
||||
boolean moved = false;
|
||||
for (ModelNode currentNode : modelNodes) {
|
||||
if (currentNode == sourceNode) {
|
||||
continue;
|
||||
}
|
||||
RoutingNode target = currentNode.getRoutingNode(routingNodes);
|
||||
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
|
||||
Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation);
|
||||
if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
|
||||
Decision sourceDecision = sourceNode.removeShard(shardRouting);
|
||||
ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
|
||||
// re-add (now relocating shard) to source node
|
||||
sourceNode.addShard(shardRouting, sourceDecision);
|
||||
Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
|
||||
currentNode.addShard(targetRelocatingShard, targetDecision);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
|
||||
}
|
||||
moved = true;
|
||||
changed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (moved == false) {
|
||||
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
|
||||
}
|
||||
changed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
|
@ -36,22 +35,22 @@ import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
|||
public interface ShardsAllocator {
|
||||
|
||||
/**
|
||||
* Applies changes on started nodes based on the implemented algorithm. For example if a
|
||||
* shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING}
|
||||
* Applies changes on started nodes based on the implemented algorithm. For example if a
|
||||
* shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING}
|
||||
* this allocator might apply some cleanups on the node that used to hold the shard.
|
||||
* @param allocation all started {@link ShardRouting shards}
|
||||
*/
|
||||
void applyStartedShards(StartedRerouteAllocation allocation);
|
||||
|
||||
/**
|
||||
* Applies changes on failed nodes based on the implemented algorithm.
|
||||
* Applies changes on failed nodes based on the implemented algorithm.
|
||||
* @param allocation all failed {@link ShardRouting shards}
|
||||
*/
|
||||
void applyFailedShards(FailedRerouteAllocation allocation);
|
||||
|
||||
/**
|
||||
* Assign all unassigned shards to nodes
|
||||
*
|
||||
* Assign all unassigned shards to nodes
|
||||
*
|
||||
* @param allocation current node allocation
|
||||
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
||||
*/
|
||||
|
@ -59,19 +58,17 @@ public interface ShardsAllocator {
|
|||
|
||||
/**
|
||||
* Rebalancing number of shards on all nodes
|
||||
*
|
||||
*
|
||||
* @param allocation current node allocation
|
||||
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
||||
*/
|
||||
boolean rebalance(RoutingAllocation allocation);
|
||||
|
||||
/**
|
||||
* Moves a shard from the given node to other node.
|
||||
*
|
||||
* @param shardRouting the shard to move
|
||||
* @param node A node containing the shard
|
||||
* Move started shards that can not be allocated to a node anymore
|
||||
*
|
||||
* @param allocation current node allocation
|
||||
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
|
||||
*/
|
||||
boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation);
|
||||
boolean moveShards(RoutingAllocation allocation);
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing.allocation.allocator;
|
||||
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
|
||||
|
@ -96,7 +94,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
return allocator.move(shardRouting, node, allocation);
|
||||
public boolean moveShards(RoutingAllocation allocation) {
|
||||
return allocator.moveShards(allocation);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public class ClusterModuleTests extends ModuleTestCase {
|
|||
return false;
|
||||
}
|
||||
@Override
|
||||
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
public boolean moveShards(RoutingAllocation allocation) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -320,7 +320,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||
public boolean moveShards(RoutingAllocation allocation) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue