Merge pull request #16926 from ywelsch/fix/balancer-move

Speed up shard balancer by reusing shard model while moving shards that can no longer be allocated to a node
This commit is contained in:
Yannick Welsch 2016-03-04 19:31:52 +01:00
commit 675d940f01
6 changed files with 94 additions and 100 deletions

View File

@ -310,7 +310,7 @@ public class AllocationService extends AbstractComponent {
}
// move shards that no longer can be allocated
changed |= moveShards(allocation);
changed |= shardsAllocators.moveShards(allocation);
// rebalance
changed |= shardsAllocators.rebalance(allocation);
@ -327,46 +327,6 @@ public class AllocationService extends AbstractComponent {
}
}
private boolean moveShards(RoutingAllocation allocation) {
boolean changed = false;
// create a copy of the shards interleaving between nodes, and check if they can remain
List<ShardRouting> shards = new ArrayList<>();
int index = 0;
boolean found = true;
final RoutingNodes routingNodes = allocation.routingNodes();
while (found) {
found = false;
for (RoutingNode routingNode : routingNodes) {
if (index >= routingNode.size()) {
continue;
}
found = true;
shards.add(routingNode.get(index));
}
index++;
}
for (int i = 0; i < shards.size(); i++) {
ShardRouting shardRouting = shards.get(i);
// we can only move started shards...
if (!shardRouting.started()) {
continue;
}
final RoutingNode routingNode = routingNodes.node(shardRouting.currentNodeId());
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation);
if (!moved) {
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
} else {
changed = true;
}
}
}
return changed;
}
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.PriorityComparator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -49,6 +50,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
@ -119,9 +121,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
@Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public boolean moveShards(RoutingAllocation allocation) {
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.move(shardRouting, node);
return balancer.moveShards();
}
/**
@ -489,56 +491,93 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
/**
* This function executes a move operation moving the given shard from
* the given node to the minimal eligible node with respect to the
* weight function. Iff the shard is moved the shard will be set to
* Move started shards that can not be allocated to a node anymore
*
* For each shard to be moved this function executes a move operation
* to the minimal eligible node with respect to the
* weight function. If a shard is moved the shard will be set to
* {@link ShardRoutingState#RELOCATING} and a shadow instance of this
* shard is created with an incremented version in the state
* {@link ShardRoutingState#INITIALIZING}.
*
* @return <code>true</code> iff the shard has successfully been moved.
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
public boolean move(ShardRouting shard, RoutingNode node ) {
if (nodes.isEmpty() || !shard.started()) {
/* with no nodes or a not started shard this is pointless */
public boolean moveShards() {
if (nodes.isEmpty()) {
/* with no nodes this is pointless */
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Try moving shard [{}] from [{}]", shard, node);
}
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
boolean changed = initialize(routingNodes, unassigned);
if (!changed) {
final ModelNode sourceNode = nodes.get(node.nodeId());
assert sourceNode != null;
final NodeSorter sorter = newNodeSorter();
sorter.reset(shard.getIndexName());
final ModelNode[] nodes = sorter.modelNodes;
assert sourceNode.containsShard(shard);
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
for (ModelNode currentNode : nodes) {
if (currentNode.getNodeId().equals(node.nodeId())) {
// Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.
List<ShardRouting> shards = new ArrayList<>();
int index = 0;
boolean found = true;
while (found) {
found = false;
for (RoutingNode routingNode : routingNodes) {
if (index >= routingNode.size()) {
continue;
}
RoutingNode target = currentNode.getRoutingNode(routingNodes);
Decision allocationDecision = allocation.deciders().canAllocate(shard, target, allocation);
Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation);
Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
sourceNode.removeShard(shard);
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
currentNode.addShard(targetRelocatingShard, decision);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
found = true;
ShardRouting shardRouting = routingNode.get(index);
// we can only move started shards...
if (shardRouting.started()) {
shards.add(shardRouting);
}
}
index++;
}
if (shards.isEmpty()) {
return false;
}
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
boolean changed = initialize(routingNodes, unassigned);
if (changed == false) {
final NodeSorter sorter = newNodeSorter();
final ModelNode[] modelNodes = sorter.modelNodes;
for (ShardRouting shardRouting : shards) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
assert sourceNode != null && sourceNode.containsShard(shardRouting);
final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes);
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
sorter.reset(shardRouting.getIndexName());
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
boolean moved = false;
for (ModelNode currentNode : modelNodes) {
if (currentNode == sourceNode) {
continue;
}
RoutingNode target = currentNode.getRoutingNode(routingNodes);
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation);
if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
Decision sourceDecision = sourceNode.removeShard(shardRouting);
ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
// re-add (now relocating shard) to source node
sourceNode.addShard(shardRouting, sourceDecision);
Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
currentNode.addShard(targetRelocatingShard, targetDecision);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
}
moved = true;
changed = true;
break;
}
}
if (moved == false) {
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}
changed = true;
break;
}
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
@ -66,12 +65,10 @@ public interface ShardsAllocator {
boolean rebalance(RoutingAllocation allocation);
/**
* Moves a shard from the given node to other node.
* Move started shards that can not be allocated to a node anymore
*
* @param shardRouting the shard to move
* @param node A node containing the shard
* @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation);
boolean moveShards(RoutingAllocation allocation);
}

View File

@ -19,8 +19,6 @@
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
@ -96,7 +94,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
}
@Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return allocator.move(shardRouting, node, allocation);
public boolean moveShards(RoutingAllocation allocation) {
return allocator.moveShards(allocation);
}
}

View File

@ -60,7 +60,7 @@ public class ClusterModuleTests extends ModuleTestCase {
return false;
}
@Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public boolean moveShards(RoutingAllocation allocation) {
return false;
}
}

View File

@ -320,7 +320,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
}
@Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public boolean moveShards(RoutingAllocation allocation) {
return false;
}