Speed up shard balancer by reusing shard model while moving shards that can no longer be allocated to a node

Decommissioning a node or applying a filter inclusion / exclusion can potentially lead to many shards that need to be moved to other nodes. This commit reuses the model across all
shard movements in an allocation round: It calculates the shard model once and simulates the application of all shards that can be moved on this model.

Closes #16926
This commit is contained in:
Yannick Welsch 2016-03-03 12:39:19 +01:00
parent 809bb9e5a1
commit 250db4999e
6 changed files with 94 additions and 100 deletions

View File

@ -310,7 +310,7 @@ public class AllocationService extends AbstractComponent {
} }
// move shards that no longer can be allocated // move shards that no longer can be allocated
changed |= moveShards(allocation); changed |= shardsAllocators.moveShards(allocation);
// rebalance // rebalance
changed |= shardsAllocators.rebalance(allocation); changed |= shardsAllocators.rebalance(allocation);
@ -327,46 +327,6 @@ public class AllocationService extends AbstractComponent {
} }
} }
private boolean moveShards(RoutingAllocation allocation) {
boolean changed = false;
// create a copy of the shards interleaving between nodes, and check if they can remain
List<ShardRouting> shards = new ArrayList<>();
int index = 0;
boolean found = true;
final RoutingNodes routingNodes = allocation.routingNodes();
while (found) {
found = false;
for (RoutingNode routingNode : routingNodes) {
if (index >= routingNode.size()) {
continue;
}
found = true;
shards.add(routingNode.get(index));
}
index++;
}
for (int i = 0; i < shards.size(); i++) {
ShardRouting shardRouting = shards.get(i);
// we can only move started shards...
if (!shardRouting.started()) {
continue;
}
final RoutingNode routingNode = routingNodes.node(shardRouting.currentNodeId());
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation);
if (!moved) {
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
} else {
changed = true;
}
}
}
return changed;
}
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) { private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false; boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes(); final RoutingNodes routingNodes = allocation.routingNodes();

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.gateway.PriorityComparator;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -49,6 +50,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -119,9 +121,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
@Override @Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public boolean moveShards(RoutingAllocation allocation) {
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.move(shardRouting, node); return balancer.moveShards();
} }
/** /**
@ -489,56 +491,93 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
/** /**
* This function executes a move operation moving the given shard from * Move started shards that can not be allocated to a node anymore
* the given node to the minimal eligible node with respect to the *
* weight function. Iff the shard is moved the shard will be set to * For each shard to be moved this function executes a move operation
* to the minimal eligible node with respect to the
* weight function. If a shard is moved the shard will be set to
* {@link ShardRoutingState#RELOCATING} and a shadow instance of this * {@link ShardRoutingState#RELOCATING} and a shadow instance of this
* shard is created with an incremented version in the state * shard is created with an incremented version in the state
* {@link ShardRoutingState#INITIALIZING}. * {@link ShardRoutingState#INITIALIZING}.
* *
* @return <code>true</code> iff the shard has successfully been moved. * @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/ */
public boolean move(ShardRouting shard, RoutingNode node ) { public boolean moveShards() {
if (nodes.isEmpty() || !shard.started()) { if (nodes.isEmpty()) {
/* with no nodes or a not started shard this is pointless */ /* with no nodes this is pointless */
return false; return false;
} }
if (logger.isTraceEnabled()) {
logger.trace("Try moving shard [{}] from [{}]", shard, node);
}
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
boolean changed = initialize(routingNodes, unassigned);
if (!changed) {
final ModelNode sourceNode = nodes.get(node.nodeId());
assert sourceNode != null;
final NodeSorter sorter = newNodeSorter();
sorter.reset(shard.getIndexName());
final ModelNode[] nodes = sorter.modelNodes;
assert sourceNode.containsShard(shard);
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
for (ModelNode currentNode : nodes) { // Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
if (currentNode.getNodeId().equals(node.nodeId())) { // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.
List<ShardRouting> shards = new ArrayList<>();
int index = 0;
boolean found = true;
while (found) {
found = false;
for (RoutingNode routingNode : routingNodes) {
if (index >= routingNode.size()) {
continue; continue;
} }
RoutingNode target = currentNode.getRoutingNode(routingNodes); found = true;
Decision allocationDecision = allocation.deciders().canAllocate(shard, target, allocation); ShardRouting shardRouting = routingNode.get(index);
Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation); // we can only move started shards...
Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); if (shardRouting.started()) {
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too? shards.add(shardRouting);
sourceNode.removeShard(shard); }
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); }
currentNode.addShard(targetRelocatingShard, decision); index++;
if (logger.isTraceEnabled()) { }
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId()); if (shards.isEmpty()) {
return false;
}
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
boolean changed = initialize(routingNodes, unassigned);
if (changed == false) {
final NodeSorter sorter = newNodeSorter();
final ModelNode[] modelNodes = sorter.modelNodes;
for (ShardRouting shardRouting : shards) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
assert sourceNode != null && sourceNode.containsShard(shardRouting);
final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes);
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
sorter.reset(shardRouting.getIndexName());
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
boolean moved = false;
for (ModelNode currentNode : modelNodes) {
if (currentNode == sourceNode) {
continue;
}
RoutingNode target = currentNode.getRoutingNode(routingNodes);
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation);
if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
Decision sourceDecision = sourceNode.removeShard(shardRouting);
ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
// re-add (now relocating shard) to source node
sourceNode.addShard(shardRouting, sourceDecision);
Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
currentNode.addShard(targetRelocatingShard, targetDecision);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
}
moved = true;
changed = true;
break;
}
}
if (moved == false) {
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
} }
changed = true;
break;
} }
} }
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.routing.allocation.allocator; package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
@ -66,12 +65,10 @@ public interface ShardsAllocator {
boolean rebalance(RoutingAllocation allocation); boolean rebalance(RoutingAllocation allocation);
/** /**
* Moves a shard from the given node to other node. * Move started shards that can not be allocated to a node anymore
* *
* @param shardRouting the shard to move
* @param node A node containing the shard
* @param allocation current node allocation * @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code> * @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/ */
boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation); boolean moveShards(RoutingAllocation allocation);
} }

View File

@ -19,8 +19,6 @@
package org.elasticsearch.cluster.routing.allocation.allocator; package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
@ -96,7 +94,7 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
} }
@Override @Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public boolean moveShards(RoutingAllocation allocation) {
return allocator.move(shardRouting, node, allocation); return allocator.moveShards(allocation);
} }
} }

View File

@ -60,7 +60,7 @@ public class ClusterModuleTests extends ModuleTestCase {
return false; return false;
} }
@Override @Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public boolean moveShards(RoutingAllocation allocation) {
return false; return false;
} }
} }

View File

@ -320,7 +320,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
} }
@Override @Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public boolean moveShards(RoutingAllocation allocation) {
return false; return false;
} }